mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
24 Commits
release-pr
...
ps-trace
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d7bce5f72 | ||
|
|
42a0f5be00 | ||
|
|
d4065f2e85 | ||
|
|
5315614974 | ||
|
|
293bc69e4f | ||
|
|
e9b158e8f5 | ||
|
|
8c09ff1764 | ||
|
|
a66a3b4ed8 | ||
|
|
6913bedc09 | ||
|
|
8a43dfd573 | ||
|
|
065adcf75b | ||
|
|
a180273dc5 | ||
|
|
6c6aa04ce4 | ||
|
|
cd8c96233f | ||
|
|
ecbda94790 | ||
|
|
0d6d8fefd3 | ||
|
|
d296a76e3e | ||
|
|
646e0f3581 | ||
|
|
0bfc422eb3 | ||
|
|
1209572cec | ||
|
|
42f603cb13 | ||
|
|
6bcfd6441f | ||
|
|
dedb03bb5a | ||
|
|
abb07df028 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -2059,6 +2059,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"svg_fmt",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
@@ -3333,6 +3334,12 @@ version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||
|
||||
[[package]]
|
||||
name = "svg_fmt"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "8.8.0"
|
||||
|
||||
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
|
||||
let url = self.url().to_owned();
|
||||
Err(PageserverHttpError::Response(
|
||||
match self.json::<HttpErrorBody>() {
|
||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
||||
Ok(err_body) => format!("Response error: {}", err_body.msg),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
},
|
||||
))
|
||||
@@ -181,14 +181,15 @@ impl PageServerNode {
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<TimelineId> {
|
||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
|
||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
|
||||
.context("Failed to create tenant")?;
|
||||
let initial_timeline_info = self.timeline_create(
|
||||
initial_tenant_id,
|
||||
new_timeline_id,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
)?;
|
||||
).context("Failed to create timeline")?;
|
||||
Ok(initial_timeline_info.timeline_id)
|
||||
}
|
||||
|
||||
@@ -419,6 +420,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -52,6 +52,7 @@ pub struct TenantCreateRequest {
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
|
||||
@@ -67,6 +67,7 @@ remote_storage = { path = "../libs/remote_storage" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
svg_fmt = "0.4.1"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
|
||||
185
pageserver/src/bin/draw_trace.rs
Normal file
185
pageserver/src/bin/draw_trace.rs
Normal file
@@ -0,0 +1,185 @@
|
||||
use clap::{App, Arg};
|
||||
use futures::TryFutureExt;
|
||||
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
|
||||
|
||||
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
|
||||
use std::io::Write;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
str::FromStr,
|
||||
};
|
||||
use svg_fmt::*;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
|
||||
let set: BTreeSet<T> = coords.into_iter().collect();
|
||||
|
||||
let mut map: BTreeMap<T, usize> = BTreeMap::new();
|
||||
for (i, e) in set.iter().enumerate() {
|
||||
map.insert(*e, i);
|
||||
}
|
||||
|
||||
(set.len(), map)
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
// TODO upgrade to struct macro arg parsing
|
||||
let arg_matches = App::new("Pageserver trace visualization tool")
|
||||
.about("Makes a svg file that displays the read pattern")
|
||||
.arg(
|
||||
Arg::new("traces_dir")
|
||||
.takes_value(true)
|
||||
.help("Directory where the read traces are stored"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
// (blkno, lsn)
|
||||
let mut dots = Vec::<(u32, Lsn)>::new();
|
||||
|
||||
|
||||
let mut dump_file = File::create("dump.txt").expect("can't make file");
|
||||
let mut deltas = HashMap::<i32, u32>::new();
|
||||
let mut prev1: u32 = 0;
|
||||
let mut prev2: u32 = 0;
|
||||
let mut prev3: u32 = 0;
|
||||
|
||||
println!("scanning trace ...");
|
||||
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
// println!("tenant: {tenant_id}");
|
||||
|
||||
println!("opening {path:?}");
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
// println!("hi");
|
||||
// println!("timeline: {timeline_id}");
|
||||
|
||||
println!("opening {path:?}");
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
println!("opening {path:?}");
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||
// println!("Parsed message {:?}", msg);
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
|
||||
// dots.push((req.blkno, req.lsn));
|
||||
// HACK
|
||||
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
|
||||
|
||||
let delta1 = (req.blkno as i32) - (prev1 as i32);
|
||||
let delta2 = (req.blkno as i32) - (prev2 as i32);
|
||||
let delta3 = (req.blkno as i32) - (prev3 as i32);
|
||||
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
|
||||
delta1
|
||||
} else {
|
||||
delta2
|
||||
};
|
||||
if i32::abs(delta3) < i32::abs(delta) {
|
||||
delta = delta3;
|
||||
}
|
||||
let delta = delta1;
|
||||
|
||||
prev3 = prev2;
|
||||
prev2 = prev1;
|
||||
prev1 = req.blkno;
|
||||
|
||||
match deltas.get_mut(&delta) {
|
||||
Some(c) => {*c += 1;},
|
||||
None => {deltas.insert(delta, 1);},
|
||||
};
|
||||
|
||||
if delta == 9 {
|
||||
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
|
||||
}
|
||||
},
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
|
||||
// HACK
|
||||
// if dots.len() > 1000 {
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut other = deltas.len();
|
||||
deltas.retain(|_, count| *count > 3);
|
||||
other -= deltas.len();
|
||||
dbg!(other);
|
||||
dbg!(deltas);
|
||||
|
||||
// Collect all coordinates
|
||||
let mut keys: Vec<u32> = vec![];
|
||||
let mut lsns: Vec<Lsn> = vec![];
|
||||
for dot in &dots {
|
||||
keys.push(dot.0);
|
||||
lsns.push(dot.1);
|
||||
}
|
||||
|
||||
// Analyze
|
||||
let (key_max, key_map) = analyze(keys);
|
||||
let (lsn_max, lsn_map) = analyze(lsns);
|
||||
|
||||
// Draw
|
||||
println!("drawing trace ...");
|
||||
let mut svg_file = File::create("out.svg").expect("can't make file");
|
||||
writeln!(
|
||||
&mut svg_file,
|
||||
"{}",
|
||||
BeginSvg {
|
||||
w: (key_max + 1) as f32,
|
||||
h: (lsn_max + 1) as f32,
|
||||
}
|
||||
)?;
|
||||
for (key, lsn) in &dots {
|
||||
let key = key_map.get(&key).unwrap();
|
||||
let lsn = lsn_map.get(&lsn).unwrap();
|
||||
writeln!(
|
||||
&mut svg_file,
|
||||
" {}",
|
||||
rectangle(
|
||||
*key as f32,
|
||||
*lsn as f32,
|
||||
10.0,
|
||||
10.0
|
||||
)
|
||||
.fill(Fill::Color(red()))
|
||||
.stroke(Stroke::Color(black(), 0.0))
|
||||
.border_radius(0.5)
|
||||
)?;
|
||||
// println!(" {}",
|
||||
// rectangle(key_start as f32 + stretch * margin,
|
||||
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
|
||||
// key_diff as f32 - stretch * 2.0 * margin,
|
||||
// stretch * (lsn_diff - 2.0 * margin))
|
||||
// // .fill(rgb(200, 200, 200))
|
||||
// .fill(fill)
|
||||
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
|
||||
// .border_radius(0.4)
|
||||
// );
|
||||
}
|
||||
|
||||
writeln!(&mut svg_file, "{}", EndSvg)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
133
pageserver/src/bin/replay.rs
Normal file
133
pageserver/src/bin/replay.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use bytes::BytesMut;
|
||||
use pageserver::page_service::PagestreamFeMessage;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
io::BufReader,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
};
|
||||
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||
|
||||
use clap::{App, Arg};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
// TODO put this in library, dedup with stuff in control_plane
|
||||
/// Client for the pageserver's pagestream API
|
||||
struct PagestreamApi {
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
impl PagestreamApi {
|
||||
async fn connect(
|
||||
connstr: &str,
|
||||
tenant: &TenantId,
|
||||
timeline: &TimelineId,
|
||||
) -> anyhow::Result<PagestreamApi> {
|
||||
// Parse connstr
|
||||
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
|
||||
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
|
||||
|
||||
// Connect
|
||||
let mut stream = TcpStream::connect(tcp_addr).await?;
|
||||
let (client, conn) = config
|
||||
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||
.await?;
|
||||
|
||||
// Enter pagestream protocol
|
||||
let init_query = format!("pagestream {} {}", tenant, timeline);
|
||||
tokio::select! {
|
||||
_ = conn => panic!("connection closed during pagestream initialization"),
|
||||
_ = client.query(init_query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
Ok(PagestreamApi { stream })
|
||||
}
|
||||
|
||||
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||
let request = {
|
||||
let msg_bytes = msg.serialize();
|
||||
let mut buf = BytesMut::new();
|
||||
let copy_msg = BeMessage::CopyData(&msg_bytes);
|
||||
|
||||
// TODO it's actually a fe message but it doesn't have a serializer yet
|
||||
BeMessage::write(&mut buf, ©_msg)?;
|
||||
buf.freeze()
|
||||
};
|
||||
self.stream.write_all(&request).await?;
|
||||
|
||||
// TODO It's actually a be message, but it doesn't have a parser.
|
||||
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
|
||||
let _response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||
Some(FeMessage::CopyData(page)) => page,
|
||||
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn replay_trace<R: std::io::Read>(
|
||||
reader: &mut R,
|
||||
mut pagestream: PagestreamApi,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||
println!("Parsed message {:?}", msg);
|
||||
pagestream.make_request(msg).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// TODO upgrade to struct macro arg parsing
|
||||
let arg_matches = App::new("Pageserver trace replay tool")
|
||||
.about("Replays wal or read traces to test pageserver performance")
|
||||
.arg(
|
||||
Arg::new("traces_dir")
|
||||
.takes_value(true)
|
||||
.help("Directory where the read traces are stored"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("pageserver_connstr")
|
||||
.takes_value(true)
|
||||
.help("Pageserver pg endpoint to connect to"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
|
||||
|
||||
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||
for tenant_dir in read_dir(traces_dir)? {
|
||||
let entry = tenant_dir?;
|
||||
let path = entry.path();
|
||||
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for timeline_dir in read_dir(path)? {
|
||||
let entry = timeline_dir?;
|
||||
let path = entry.path();
|
||||
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
for trace_dir in read_dir(path)? {
|
||||
let entry = trace_dir?;
|
||||
let path = entry.path();
|
||||
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||
|
||||
// TODO The pageserver deletes existing traces?
|
||||
// LOL yes because I use tenant ID as trace id
|
||||
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
|
||||
|
||||
let file = File::open(path.clone())?;
|
||||
let mut reader = BufReader::new(file);
|
||||
// let len = file.metadata().unwrap().len();
|
||||
// println!("replaying {:?} trace {} bytes", path, len);
|
||||
replay_trace(&mut reader, pagestream).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -364,6 +364,23 @@ impl PageServerConf {
|
||||
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
pub fn trace_path(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
connection_id: &TimelineId, // TODO make a new type
|
||||
) -> PathBuf {
|
||||
self.traces_path()
|
||||
.join(tenant_id.to_string())
|
||||
.join(timeline_id.to_string())
|
||||
.join(connection_id.to_string())
|
||||
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
/// where certain timeline's metadata file should be located.
|
||||
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
||||
|
||||
@@ -617,6 +617,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||
|
||||
@@ -16,6 +16,8 @@ pub mod tenant;
|
||||
pub mod tenant_config;
|
||||
pub mod tenant_mgr;
|
||||
pub mod tenant_tasks;
|
||||
// pub mod timelines;
|
||||
pub mod trace;
|
||||
pub mod virtual_file;
|
||||
pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
|
||||
@@ -10,7 +10,9 @@
|
||||
//
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::Buf;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use futures::{Stream, StreamExt};
|
||||
use regex::Regex;
|
||||
use std::io;
|
||||
@@ -42,6 +44,7 @@ use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant_mgr;
|
||||
use crate::trace::Tracer;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
@@ -49,7 +52,9 @@ use postgres_ffi::to_pg_timestamp;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamFeMessage {
|
||||
// TODO these should be in a library outside the pageserver
|
||||
#[derive(Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
@@ -57,7 +62,7 @@ enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamBeMessage {
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
@@ -66,106 +71,152 @@ enum PagestreamBeMessage {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamExistsRequest {
|
||||
pub struct PagestreamExistsRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamNblocksRequest {
|
||||
pub struct PagestreamNblocksRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageRequest {
|
||||
pub struct PagestreamGetPageRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
blkno: u32,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamDbSizeRequest {
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamExistsResponse {
|
||||
pub struct PagestreamExistsResponse {
|
||||
exists: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamNblocksResponse {
|
||||
pub struct PagestreamNblocksResponse {
|
||||
n_blocks: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageResponse {
|
||||
pub struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamErrorResponse {
|
||||
pub struct PagestreamErrorResponse {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamDbSizeResponse {
|
||||
pub struct PagestreamDbSizeResponse {
|
||||
db_size: i64,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// TODO these gets can fail
|
||||
|
||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
let msg_tag = body.get_u8();
|
||||
let msg_tag = body.read_u8()?;
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.get_u32(),
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
dbnode: body.get_u32(),
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -422,6 +473,17 @@ impl PageServerHandler {
|
||||
// so there is no need to reset the association
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
// Make request tracer if needed
|
||||
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let path = tenant
|
||||
.conf
|
||||
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
|
||||
Some(Tracer::new(path))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||
|
||||
@@ -446,15 +508,24 @@ impl PageServerHandler {
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Sync) => {
|
||||
// TODO what now?
|
||||
continue;
|
||||
}
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
|
||||
@@ -593,6 +593,14 @@ impl Tenant {
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||
}
|
||||
|
||||
// TODO is there a need for all this boilerplate?
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||
}
|
||||
|
||||
@@ -82,6 +82,10 @@ pub struct TenantConf {
|
||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||
/// to avoid eager reconnects.
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
/// If enabled, records all read requests for this ... TODO
|
||||
/// Even though read traces are small, there's no delete meachanism so they can
|
||||
/// pile up. So we disable this by default.
|
||||
pub trace_read_requests: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -105,6 +109,7 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub trace_read_requests: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -138,6 +143,9 @@ impl TenantConfOpt {
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||
trace_read_requests: self
|
||||
.trace_read_requests
|
||||
.unwrap_or(global_conf.trace_read_requests),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,6 +215,7 @@ impl TenantConf {
|
||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,6 +241,7 @@ impl TenantConf {
|
||||
.unwrap(),
|
||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||
.unwrap(),
|
||||
trace_read_requests: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
36
pageserver/src/trace.rs
Normal file
36
pageserver/src/trace.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use bytes::Bytes;
|
||||
use std::{
|
||||
fs::{create_dir_all, File},
|
||||
io::{BufWriter, Write},
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
pub struct Tracer {
|
||||
writer: BufWriter<File>,
|
||||
}
|
||||
|
||||
impl Drop for Tracer {
|
||||
fn drop(&mut self) {
|
||||
self.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl Tracer {
|
||||
pub fn new(path: PathBuf) -> Self {
|
||||
let parent = path.parent().expect("failed to parse parent path");
|
||||
create_dir_all(parent).expect("failed to create trace dir");
|
||||
|
||||
let file = File::create(path).expect("failed to create trace file");
|
||||
Tracer {
|
||||
writer: BufWriter::new(file),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trace(&mut self, msg: &Bytes) {
|
||||
self.writer.write(msg).expect("failed to write trace");
|
||||
}
|
||||
|
||||
pub fn flush(&mut self) {
|
||||
self.writer.flush().expect("failed to flush trace file");
|
||||
}
|
||||
}
|
||||
@@ -264,12 +264,76 @@ pageserver_flush(void)
|
||||
}
|
||||
}
|
||||
|
||||
// Entry of the page_cache
|
||||
typedef struct
|
||||
{
|
||||
// HACK Just xor of bytes lol
|
||||
char request_hash;
|
||||
|
||||
// Points directly to a NeonResponse. We can't just own the
|
||||
// NeonResponse because it's a "supertype", so it's not Sized.
|
||||
char* response;
|
||||
int len;
|
||||
} NeonRequestResponse;
|
||||
|
||||
#define MAX_PAGE_CACHE_SIZE 50
|
||||
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
|
||||
int page_cache_size = 0;
|
||||
int page_cache_head = 0;
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_call(NeonRequest * request)
|
||||
{
|
||||
// Compute hash
|
||||
char hash = 0;
|
||||
StringInfoData req_buff;
|
||||
req_buff = nm_pack_request(request);
|
||||
for (int i = 0; i < req_buff.len; i++) {
|
||||
hash ^= req_buff.data[i];
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
// If result is cached, memcpy and return
|
||||
for (int i = 0; i < page_cache_size; i++) {
|
||||
if (page_cache[i].request_hash == hash) {
|
||||
int len = page_cache[0].response->len;
|
||||
NeonResponse *resp = palloc0(len);
|
||||
// I'd rather Rc than memcpy, but this is not rust :(
|
||||
memcpy(resp, page_cache[0].response->data, len);
|
||||
elog(LOG, "cache hit !!!");
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
|
||||
// Send request, get response
|
||||
pageserver_send(request);
|
||||
pageserver_flush();
|
||||
return pageserver_receive();
|
||||
NeonResponse *resp = pageserver_receive();
|
||||
|
||||
// Get length
|
||||
int len = -1;
|
||||
switch (resp->tag) {
|
||||
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
|
||||
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
|
||||
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
|
||||
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
|
||||
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
|
||||
}
|
||||
|
||||
// Cache result
|
||||
page_cache[page_cache_head].request_hash = hash;
|
||||
if (page_cache_head < page_cache_size) {
|
||||
pfree(page_cache[page_cache_head].response);
|
||||
}
|
||||
page_cache[page_cache_head].len = len;
|
||||
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
|
||||
memcpy(page_cache[page_cache_head].response, resp, len);
|
||||
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
|
||||
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
|
||||
page_cache_size += 1;
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
|
||||
@@ -91,10 +91,25 @@ class NeonCompare(PgCompare):
|
||||
self._pg_bin = pg_bin
|
||||
self.pageserver_http_client = self.env.pageserver.http_client()
|
||||
|
||||
self.tenant, _ = self.env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
|
||||
# HACK enable request tracing, as an experiment
|
||||
# NOTE This must be done before the pg starts pagestream
|
||||
# TODO why does it not work?
|
||||
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
|
||||
# "trace_read_requests": "true",
|
||||
# })
|
||||
|
||||
# We only use one branch and one timeline
|
||||
self.env.neon_cli.create_branch(branch_name, "empty")
|
||||
self._pg = self.env.postgres.create_start(branch_name)
|
||||
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
|
||||
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||
self._pg = self.env.postgres.create_start(
|
||||
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
|
||||
|
||||
@property
|
||||
def pg(self):
|
||||
@@ -109,10 +124,10 @@ class NeonCompare(PgCompare):
|
||||
return self._pg_bin
|
||||
|
||||
def flush(self):
|
||||
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
|
||||
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||
|
||||
def compact(self):
|
||||
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
|
||||
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
|
||||
|
||||
def report_peak_memory_use(self) -> None:
|
||||
self.zenbenchmark.record(
|
||||
@@ -124,7 +139,7 @@ class NeonCompare(PgCompare):
|
||||
|
||||
def report_size(self) -> None:
|
||||
timeline_size = self.zenbenchmark.get_timeline_size(
|
||||
self.env.repo_dir, self.env.initial_tenant, self.timeline
|
||||
self.env.repo_dir, self.tenant, self.timeline
|
||||
)
|
||||
self.zenbenchmark.record(
|
||||
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||
|
||||
@@ -1749,6 +1749,37 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_version)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReplayBin:
|
||||
"""A helper class for replaying pageserver wal and read traces."""
|
||||
|
||||
traces_dir: str
|
||||
|
||||
def replay_all(self, pageserver_connstr):
|
||||
replay_binpath = os.path.join(str(neon_binpath), "replay")
|
||||
args = [
|
||||
replay_binpath,
|
||||
self.traces_dir,
|
||||
pageserver_connstr,
|
||||
]
|
||||
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||
subprocess.run(args)
|
||||
|
||||
def draw_all(self):
|
||||
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
|
||||
args = [
|
||||
draw_binpath,
|
||||
self.traces_dir,
|
||||
]
|
||||
subprocess.run(args)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def replay_bin(test_output_dir):
|
||||
traces_dir = os.path.join(test_output_dir, "repo", "traces")
|
||||
return ReplayBin(traces_dir)
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||
super().__init__(host="localhost", port=port, dbname="postgres")
|
||||
|
||||
@@ -175,6 +175,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||
|
||||
# Run the pgbench tests, and generate a flamegraph from it
|
||||
# This requires that the pageserver was built with the 'profiling' feature.
|
||||
#
|
||||
|
||||
57
test_runner/performance/test_trace_replay.py
Normal file
57
test_runner/performance/test_trace_replay.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
|
||||
|
||||
|
||||
# This test is a demonstration of how to do trace playback. With the current
|
||||
# workload it uses it's not testing anything meaningful.
|
||||
def test_trace_replay(
|
||||
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
|
||||
):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"trace_read_requests": "true",
|
||||
}
|
||||
)
|
||||
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||
# env.neon_cli.config_tenant(tenant, conf={
|
||||
# "trace_read_requests": "true",
|
||||
# })
|
||||
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
|
||||
|
||||
with zenbenchmark.record_duration("run"):
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
pg.safe_psql("select 1;")
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table t (i integer);")
|
||||
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||
cur.execute("select count(*) from t;")
|
||||
|
||||
# Stop pg so we drop the connection and flush the traces
|
||||
pg.stop()
|
||||
|
||||
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||
# env.neon_cli.config_tenant(tenant, conf={
|
||||
# "trace_read_requests": "false",
|
||||
# })
|
||||
|
||||
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||
# assert trace_path.exists()
|
||||
|
||||
replay_bin.draw_all()
|
||||
return
|
||||
|
||||
print("replaying")
|
||||
ps_connstr = env.pageserver.connstr()
|
||||
with zenbenchmark.record_duration("replay"):
|
||||
output = replay_bin.replay_all(ps_connstr)
|
||||
print(output)
|
||||
@@ -1,6 +1,7 @@
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
Reference in New Issue
Block a user