mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 11:40:37 +00:00
Compare commits
24 Commits
hack/compu
...
ps-trace
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d7bce5f72 | ||
|
|
42a0f5be00 | ||
|
|
d4065f2e85 | ||
|
|
5315614974 | ||
|
|
293bc69e4f | ||
|
|
e9b158e8f5 | ||
|
|
8c09ff1764 | ||
|
|
a66a3b4ed8 | ||
|
|
6913bedc09 | ||
|
|
8a43dfd573 | ||
|
|
065adcf75b | ||
|
|
a180273dc5 | ||
|
|
6c6aa04ce4 | ||
|
|
cd8c96233f | ||
|
|
ecbda94790 | ||
|
|
0d6d8fefd3 | ||
|
|
d296a76e3e | ||
|
|
646e0f3581 | ||
|
|
0bfc422eb3 | ||
|
|
1209572cec | ||
|
|
42f603cb13 | ||
|
|
6bcfd6441f | ||
|
|
dedb03bb5a | ||
|
|
abb07df028 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -2059,6 +2059,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_with",
|
"serde_with",
|
||||||
"signal-hook",
|
"signal-hook",
|
||||||
|
"svg_fmt",
|
||||||
"tar",
|
"tar",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
@@ -3333,6 +3334,12 @@ version = "2.4.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "svg_fmt"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8fb1df15f412ee2e9dfc1c504260fa695c1c3f10fe9f4a6ee2d2184d7d6450e2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "symbolic-common"
|
name = "symbolic-common"
|
||||||
version = "8.8.0"
|
version = "8.8.0"
|
||||||
|
|||||||
@@ -61,7 +61,7 @@ impl ResponseErrorMessageExt for Response {
|
|||||||
let url = self.url().to_owned();
|
let url = self.url().to_owned();
|
||||||
Err(PageserverHttpError::Response(
|
Err(PageserverHttpError::Response(
|
||||||
match self.json::<HttpErrorBody>() {
|
match self.json::<HttpErrorBody>() {
|
||||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
Ok(err_body) => format!("Response error: {}", err_body.msg),
|
||||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
@@ -181,14 +181,15 @@ impl PageServerNode {
|
|||||||
new_timeline_id: Option<TimelineId>,
|
new_timeline_id: Option<TimelineId>,
|
||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
) -> anyhow::Result<TimelineId> {
|
) -> anyhow::Result<TimelineId> {
|
||||||
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
|
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
|
||||||
|
.context("Failed to create tenant")?;
|
||||||
let initial_timeline_info = self.timeline_create(
|
let initial_timeline_info = self.timeline_create(
|
||||||
initial_tenant_id,
|
initial_tenant_id,
|
||||||
new_timeline_id,
|
new_timeline_id,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(pg_version),
|
Some(pg_version),
|
||||||
)?;
|
).context("Failed to create timeline")?;
|
||||||
Ok(initial_timeline_info.timeline_id)
|
Ok(initial_timeline_info.timeline_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -419,6 +420,11 @@ impl PageServerNode {
|
|||||||
.map(|x| x.parse::<NonZeroU64>())
|
.map(|x| x.parse::<NonZeroU64>())
|
||||||
.transpose()
|
.transpose()
|
||||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||||
|
trace_read_requests: settings
|
||||||
|
.remove("trace_read_requests")
|
||||||
|
.map(|x| x.parse::<bool>())
|
||||||
|
.transpose()
|
||||||
|
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||||
};
|
};
|
||||||
if !settings.is_empty() {
|
if !settings.is_empty() {
|
||||||
bail!("Unrecognized tenant settings: {settings:?}")
|
bail!("Unrecognized tenant settings: {settings:?}")
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ pub struct TenantCreateRequest {
|
|||||||
pub walreceiver_connect_timeout: Option<String>,
|
pub walreceiver_connect_timeout: Option<String>,
|
||||||
pub lagging_wal_timeout: Option<String>,
|
pub lagging_wal_timeout: Option<String>,
|
||||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
|
pub trace_read_requests: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
|
|||||||
@@ -67,6 +67,7 @@ remote_storage = { path = "../libs/remote_storage" }
|
|||||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||||
close_fds = "0.3.2"
|
close_fds = "0.3.2"
|
||||||
walkdir = "2.3.2"
|
walkdir = "2.3.2"
|
||||||
|
svg_fmt = "0.4.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
hex-literal = "0.3"
|
hex-literal = "0.3"
|
||||||
|
|||||||
185
pageserver/src/bin/draw_trace.rs
Normal file
185
pageserver/src/bin/draw_trace.rs
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
use clap::{App, Arg};
|
||||||
|
use futures::TryFutureExt;
|
||||||
|
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
|
||||||
|
|
||||||
|
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
|
||||||
|
use std::io::Write;
|
||||||
|
use std::{
|
||||||
|
fs::{read_dir, File},
|
||||||
|
io::BufReader,
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
use svg_fmt::*;
|
||||||
|
use utils::{
|
||||||
|
id::{TenantId, TimelineId},
|
||||||
|
lsn::Lsn,
|
||||||
|
pq_proto::{BeMessage, FeMessage},
|
||||||
|
};
|
||||||
|
|
||||||
|
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
|
||||||
|
let set: BTreeSet<T> = coords.into_iter().collect();
|
||||||
|
|
||||||
|
let mut map: BTreeMap<T, usize> = BTreeMap::new();
|
||||||
|
for (i, e) in set.iter().enumerate() {
|
||||||
|
map.insert(*e, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
(set.len(), map)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> anyhow::Result<()> {
|
||||||
|
// TODO upgrade to struct macro arg parsing
|
||||||
|
let arg_matches = App::new("Pageserver trace visualization tool")
|
||||||
|
.about("Makes a svg file that displays the read pattern")
|
||||||
|
.arg(
|
||||||
|
Arg::new("traces_dir")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Directory where the read traces are stored"),
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
// (blkno, lsn)
|
||||||
|
let mut dots = Vec::<(u32, Lsn)>::new();
|
||||||
|
|
||||||
|
|
||||||
|
let mut dump_file = File::create("dump.txt").expect("can't make file");
|
||||||
|
let mut deltas = HashMap::<i32, u32>::new();
|
||||||
|
let mut prev1: u32 = 0;
|
||||||
|
let mut prev2: u32 = 0;
|
||||||
|
let mut prev3: u32 = 0;
|
||||||
|
|
||||||
|
println!("scanning trace ...");
|
||||||
|
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||||
|
for tenant_dir in read_dir(traces_dir)? {
|
||||||
|
let entry = tenant_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
// let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
// println!("tenant: {tenant_id}");
|
||||||
|
|
||||||
|
println!("opening {path:?}");
|
||||||
|
for timeline_dir in read_dir(path)? {
|
||||||
|
let entry = timeline_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
// let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
// println!("hi");
|
||||||
|
// println!("timeline: {timeline_id}");
|
||||||
|
|
||||||
|
println!("opening {path:?}");
|
||||||
|
for trace_dir in read_dir(path)? {
|
||||||
|
let entry = trace_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
// let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
println!("opening {path:?}");
|
||||||
|
let file = File::open(path.clone())?;
|
||||||
|
let mut reader = BufReader::new(file);
|
||||||
|
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
|
||||||
|
// println!("Parsed message {:?}", msg);
|
||||||
|
match msg {
|
||||||
|
PagestreamFeMessage::Exists(_) => {}
|
||||||
|
PagestreamFeMessage::Nblocks(_) => {}
|
||||||
|
PagestreamFeMessage::GetPage(req) => {
|
||||||
|
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
|
||||||
|
// dots.push((req.blkno, req.lsn));
|
||||||
|
// HACK
|
||||||
|
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
|
||||||
|
|
||||||
|
let delta1 = (req.blkno as i32) - (prev1 as i32);
|
||||||
|
let delta2 = (req.blkno as i32) - (prev2 as i32);
|
||||||
|
let delta3 = (req.blkno as i32) - (prev3 as i32);
|
||||||
|
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
|
||||||
|
delta1
|
||||||
|
} else {
|
||||||
|
delta2
|
||||||
|
};
|
||||||
|
if i32::abs(delta3) < i32::abs(delta) {
|
||||||
|
delta = delta3;
|
||||||
|
}
|
||||||
|
let delta = delta1;
|
||||||
|
|
||||||
|
prev3 = prev2;
|
||||||
|
prev2 = prev1;
|
||||||
|
prev1 = req.blkno;
|
||||||
|
|
||||||
|
match deltas.get_mut(&delta) {
|
||||||
|
Some(c) => {*c += 1;},
|
||||||
|
None => {deltas.insert(delta, 1);},
|
||||||
|
};
|
||||||
|
|
||||||
|
if delta == 9 {
|
||||||
|
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
PagestreamFeMessage::DbSize(_) => {}
|
||||||
|
};
|
||||||
|
|
||||||
|
// HACK
|
||||||
|
// if dots.len() > 1000 {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut other = deltas.len();
|
||||||
|
deltas.retain(|_, count| *count > 3);
|
||||||
|
other -= deltas.len();
|
||||||
|
dbg!(other);
|
||||||
|
dbg!(deltas);
|
||||||
|
|
||||||
|
// Collect all coordinates
|
||||||
|
let mut keys: Vec<u32> = vec![];
|
||||||
|
let mut lsns: Vec<Lsn> = vec![];
|
||||||
|
for dot in &dots {
|
||||||
|
keys.push(dot.0);
|
||||||
|
lsns.push(dot.1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Analyze
|
||||||
|
let (key_max, key_map) = analyze(keys);
|
||||||
|
let (lsn_max, lsn_map) = analyze(lsns);
|
||||||
|
|
||||||
|
// Draw
|
||||||
|
println!("drawing trace ...");
|
||||||
|
let mut svg_file = File::create("out.svg").expect("can't make file");
|
||||||
|
writeln!(
|
||||||
|
&mut svg_file,
|
||||||
|
"{}",
|
||||||
|
BeginSvg {
|
||||||
|
w: (key_max + 1) as f32,
|
||||||
|
h: (lsn_max + 1) as f32,
|
||||||
|
}
|
||||||
|
)?;
|
||||||
|
for (key, lsn) in &dots {
|
||||||
|
let key = key_map.get(&key).unwrap();
|
||||||
|
let lsn = lsn_map.get(&lsn).unwrap();
|
||||||
|
writeln!(
|
||||||
|
&mut svg_file,
|
||||||
|
" {}",
|
||||||
|
rectangle(
|
||||||
|
*key as f32,
|
||||||
|
*lsn as f32,
|
||||||
|
10.0,
|
||||||
|
10.0
|
||||||
|
)
|
||||||
|
.fill(Fill::Color(red()))
|
||||||
|
.stroke(Stroke::Color(black(), 0.0))
|
||||||
|
.border_radius(0.5)
|
||||||
|
)?;
|
||||||
|
// println!(" {}",
|
||||||
|
// rectangle(key_start as f32 + stretch * margin,
|
||||||
|
// stretch * (lsn_max as f32 - 1.0 - (lsn_start as f32 + margin - lsn_offset)),
|
||||||
|
// key_diff as f32 - stretch * 2.0 * margin,
|
||||||
|
// stretch * (lsn_diff - 2.0 * margin))
|
||||||
|
// // .fill(rgb(200, 200, 200))
|
||||||
|
// .fill(fill)
|
||||||
|
// .stroke(Stroke::Color(rgb(200, 200, 200), 0.1))
|
||||||
|
// .border_radius(0.4)
|
||||||
|
// );
|
||||||
|
}
|
||||||
|
|
||||||
|
writeln!(&mut svg_file, "{}", EndSvg)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
133
pageserver/src/bin/replay.rs
Normal file
133
pageserver/src/bin/replay.rs
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
use bytes::BytesMut;
|
||||||
|
use pageserver::page_service::PagestreamFeMessage;
|
||||||
|
use std::{
|
||||||
|
fs::{read_dir, File},
|
||||||
|
io::BufReader,
|
||||||
|
path::PathBuf,
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||||
|
|
||||||
|
use clap::{App, Arg};
|
||||||
|
use utils::{
|
||||||
|
id::{TenantId, TimelineId},
|
||||||
|
pq_proto::{BeMessage, FeMessage},
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO put this in library, dedup with stuff in control_plane
|
||||||
|
/// Client for the pageserver's pagestream API
|
||||||
|
struct PagestreamApi {
|
||||||
|
stream: TcpStream,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PagestreamApi {
|
||||||
|
async fn connect(
|
||||||
|
connstr: &str,
|
||||||
|
tenant: &TenantId,
|
||||||
|
timeline: &TimelineId,
|
||||||
|
) -> anyhow::Result<PagestreamApi> {
|
||||||
|
// Parse connstr
|
||||||
|
let config = tokio_postgres::Config::from_str(connstr).expect("bad connstr");
|
||||||
|
let tcp_addr = format!("localhost:{}", config.get_ports()[0]);
|
||||||
|
|
||||||
|
// Connect
|
||||||
|
let mut stream = TcpStream::connect(tcp_addr).await?;
|
||||||
|
let (client, conn) = config
|
||||||
|
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Enter pagestream protocol
|
||||||
|
let init_query = format!("pagestream {} {}", tenant, timeline);
|
||||||
|
tokio::select! {
|
||||||
|
_ = conn => panic!("connection closed during pagestream initialization"),
|
||||||
|
_ = client.query(init_query.as_str(), &[]) => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(PagestreamApi { stream })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||||
|
let request = {
|
||||||
|
let msg_bytes = msg.serialize();
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
let copy_msg = BeMessage::CopyData(&msg_bytes);
|
||||||
|
|
||||||
|
// TODO it's actually a fe message but it doesn't have a serializer yet
|
||||||
|
BeMessage::write(&mut buf, ©_msg)?;
|
||||||
|
buf.freeze()
|
||||||
|
};
|
||||||
|
self.stream.write_all(&request).await?;
|
||||||
|
|
||||||
|
// TODO It's actually a be message, but it doesn't have a parser.
|
||||||
|
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
|
||||||
|
let _response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||||
|
Some(FeMessage::CopyData(page)) => page,
|
||||||
|
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn replay_trace<R: std::io::Read>(
|
||||||
|
reader: &mut R,
|
||||||
|
mut pagestream: PagestreamApi,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||||
|
println!("Parsed message {:?}", msg);
|
||||||
|
pagestream.make_request(msg).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
// TODO upgrade to struct macro arg parsing
|
||||||
|
let arg_matches = App::new("Pageserver trace replay tool")
|
||||||
|
.about("Replays wal or read traces to test pageserver performance")
|
||||||
|
.arg(
|
||||||
|
Arg::new("traces_dir")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Directory where the read traces are stored"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::new("pageserver_connstr")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Pageserver pg endpoint to connect to"),
|
||||||
|
)
|
||||||
|
.get_matches();
|
||||||
|
|
||||||
|
let connstr = arg_matches.value_of("pageserver_connstr").unwrap();
|
||||||
|
|
||||||
|
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
|
||||||
|
for tenant_dir in read_dir(traces_dir)? {
|
||||||
|
let entry = tenant_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
let tenant_id = TenantId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
for timeline_dir in read_dir(path)? {
|
||||||
|
let entry = timeline_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
let timeline_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
for trace_dir in read_dir(path)? {
|
||||||
|
let entry = trace_dir?;
|
||||||
|
let path = entry.path();
|
||||||
|
let _conn_id = TimelineId::from_str(path.file_name().unwrap().to_str().unwrap())?;
|
||||||
|
|
||||||
|
// TODO The pageserver deletes existing traces?
|
||||||
|
// LOL yes because I use tenant ID as trace id
|
||||||
|
let pagestream = PagestreamApi::connect(connstr, &tenant_id, &timeline_id).await?;
|
||||||
|
|
||||||
|
let file = File::open(path.clone())?;
|
||||||
|
let mut reader = BufReader::new(file);
|
||||||
|
// let len = file.metadata().unwrap().len();
|
||||||
|
// println!("replaying {:?} trace {} bytes", path, len);
|
||||||
|
replay_trace(&mut reader, pagestream).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -364,6 +364,23 @@ impl PageServerConf {
|
|||||||
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
self.timelines_path(tenant_id).join(timeline_id.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn traces_path(&self) -> PathBuf {
|
||||||
|
self.workdir.join("traces")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn trace_path(
|
||||||
|
&self,
|
||||||
|
tenant_id: &TenantId,
|
||||||
|
timeline_id: &TimelineId,
|
||||||
|
connection_id: &TimelineId, // TODO make a new type
|
||||||
|
) -> PathBuf {
|
||||||
|
self.traces_path()
|
||||||
|
.join(tenant_id.to_string())
|
||||||
|
.join(timeline_id.to_string())
|
||||||
|
.join(connection_id.to_string())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/// Points to a place in pageserver's local directory,
|
/// Points to a place in pageserver's local directory,
|
||||||
/// where certain timeline's metadata file should be located.
|
/// where certain timeline's metadata file should be located.
|
||||||
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
|
||||||
|
|||||||
@@ -617,6 +617,9 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
|||||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||||
}
|
}
|
||||||
|
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||||
|
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||||
|
}
|
||||||
|
|
||||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||||
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ pub mod tenant;
|
|||||||
pub mod tenant_config;
|
pub mod tenant_config;
|
||||||
pub mod tenant_mgr;
|
pub mod tenant_mgr;
|
||||||
pub mod tenant_tasks;
|
pub mod tenant_tasks;
|
||||||
|
// pub mod timelines;
|
||||||
|
pub mod trace;
|
||||||
pub mod virtual_file;
|
pub mod virtual_file;
|
||||||
pub mod walingest;
|
pub mod walingest;
|
||||||
pub mod walreceiver;
|
pub mod walreceiver;
|
||||||
|
|||||||
@@ -10,7 +10,9 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
use anyhow::{bail, ensure, Context, Result};
|
use anyhow::{bail, ensure, Context, Result};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use byteorder::{BigEndian, ReadBytesExt};
|
||||||
|
use bytes::Buf;
|
||||||
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::io;
|
use std::io;
|
||||||
@@ -42,6 +44,7 @@ use crate::task_mgr;
|
|||||||
use crate::task_mgr::TaskKind;
|
use crate::task_mgr::TaskKind;
|
||||||
use crate::tenant::Timeline;
|
use crate::tenant::Timeline;
|
||||||
use crate::tenant_mgr;
|
use crate::tenant_mgr;
|
||||||
|
use crate::trace::Tracer;
|
||||||
use crate::CheckpointConfig;
|
use crate::CheckpointConfig;
|
||||||
|
|
||||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||||
@@ -49,7 +52,9 @@ use postgres_ffi::to_pg_timestamp;
|
|||||||
use postgres_ffi::BLCKSZ;
|
use postgres_ffi::BLCKSZ;
|
||||||
|
|
||||||
// Wrapped in libpq CopyData
|
// Wrapped in libpq CopyData
|
||||||
enum PagestreamFeMessage {
|
// TODO these should be in a library outside the pageserver
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PagestreamFeMessage {
|
||||||
Exists(PagestreamExistsRequest),
|
Exists(PagestreamExistsRequest),
|
||||||
Nblocks(PagestreamNblocksRequest),
|
Nblocks(PagestreamNblocksRequest),
|
||||||
GetPage(PagestreamGetPageRequest),
|
GetPage(PagestreamGetPageRequest),
|
||||||
@@ -57,7 +62,7 @@ enum PagestreamFeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wrapped in libpq CopyData
|
// Wrapped in libpq CopyData
|
||||||
enum PagestreamBeMessage {
|
pub enum PagestreamBeMessage {
|
||||||
Exists(PagestreamExistsResponse),
|
Exists(PagestreamExistsResponse),
|
||||||
Nblocks(PagestreamNblocksResponse),
|
Nblocks(PagestreamNblocksResponse),
|
||||||
GetPage(PagestreamGetPageResponse),
|
GetPage(PagestreamGetPageResponse),
|
||||||
@@ -66,106 +71,152 @@ enum PagestreamBeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamExistsRequest {
|
pub struct PagestreamExistsRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
rel: RelTag,
|
rel: RelTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamNblocksRequest {
|
pub struct PagestreamNblocksRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
rel: RelTag,
|
rel: RelTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamGetPageRequest {
|
pub struct PagestreamGetPageRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
rel: RelTag,
|
pub rel: RelTag,
|
||||||
blkno: u32,
|
pub blkno: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamDbSizeRequest {
|
pub struct PagestreamDbSizeRequest {
|
||||||
latest: bool,
|
latest: bool,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
dbnode: u32,
|
dbnode: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamExistsResponse {
|
pub struct PagestreamExistsResponse {
|
||||||
exists: bool,
|
exists: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamNblocksResponse {
|
pub struct PagestreamNblocksResponse {
|
||||||
n_blocks: u32,
|
n_blocks: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamGetPageResponse {
|
pub struct PagestreamGetPageResponse {
|
||||||
page: Bytes,
|
page: Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamErrorResponse {
|
pub struct PagestreamErrorResponse {
|
||||||
message: String,
|
message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PagestreamDbSizeResponse {
|
pub struct PagestreamDbSizeResponse {
|
||||||
db_size: i64,
|
db_size: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PagestreamFeMessage {
|
impl PagestreamFeMessage {
|
||||||
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
|
pub fn serialize(&self) -> Bytes {
|
||||||
|
let mut bytes = BytesMut::new();
|
||||||
|
|
||||||
|
match self {
|
||||||
|
Self::Exists(req) => {
|
||||||
|
bytes.put_u8(0);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.rel.spcnode);
|
||||||
|
bytes.put_u32(req.rel.dbnode);
|
||||||
|
bytes.put_u32(req.rel.relnode);
|
||||||
|
bytes.put_u8(req.rel.forknum);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::Nblocks(req) => {
|
||||||
|
bytes.put_u8(1);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.rel.spcnode);
|
||||||
|
bytes.put_u32(req.rel.dbnode);
|
||||||
|
bytes.put_u32(req.rel.relnode);
|
||||||
|
bytes.put_u8(req.rel.forknum);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::GetPage(req) => {
|
||||||
|
bytes.put_u8(2);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.rel.spcnode);
|
||||||
|
bytes.put_u32(req.rel.dbnode);
|
||||||
|
bytes.put_u32(req.rel.relnode);
|
||||||
|
bytes.put_u8(req.rel.forknum);
|
||||||
|
bytes.put_u32(req.blkno);
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::DbSize(req) => {
|
||||||
|
bytes.put_u8(3);
|
||||||
|
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||||
|
bytes.put_u64(req.lsn.0);
|
||||||
|
bytes.put_u32(req.dbnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
|
||||||
// TODO these gets can fail
|
// TODO these gets can fail
|
||||||
|
|
||||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||||
//
|
//
|
||||||
// TODO: consider using protobuf or serde bincode for less error prone
|
// TODO: consider using protobuf or serde bincode for less error prone
|
||||||
// serialization.
|
// serialization.
|
||||||
let msg_tag = body.get_u8();
|
let msg_tag = body.read_u8()?;
|
||||||
match msg_tag {
|
match msg_tag {
|
||||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.get_u32(),
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
relnode: body.get_u32(),
|
relnode: body.read_u32::<BigEndian>()?,
|
||||||
forknum: body.get_u8(),
|
forknum: body.read_u8()?,
|
||||||
},
|
},
|
||||||
})),
|
})),
|
||||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.get_u32(),
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
relnode: body.get_u32(),
|
relnode: body.read_u32::<BigEndian>()?,
|
||||||
forknum: body.get_u8(),
|
forknum: body.read_u8()?,
|
||||||
},
|
},
|
||||||
})),
|
})),
|
||||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.get_u32(),
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
relnode: body.get_u32(),
|
relnode: body.read_u32::<BigEndian>()?,
|
||||||
forknum: body.get_u8(),
|
forknum: body.read_u8()?,
|
||||||
},
|
},
|
||||||
blkno: body.get_u32(),
|
blkno: body.read_u32::<BigEndian>()?,
|
||||||
})),
|
})),
|
||||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||||
latest: body.get_u8() != 0,
|
latest: body.read_u8()? != 0,
|
||||||
lsn: Lsn::from(body.get_u64()),
|
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||||
dbnode: body.get_u32(),
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
})),
|
})),
|
||||||
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
|
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -422,6 +473,17 @@ impl PageServerHandler {
|
|||||||
// so there is no need to reset the association
|
// so there is no need to reset the association
|
||||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||||
|
|
||||||
|
// Make request tracer if needed
|
||||||
|
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
|
||||||
|
let mut tracer = if tenant.get_trace_read_requests() {
|
||||||
|
let path = tenant
|
||||||
|
.conf
|
||||||
|
.trace_path(&tenant_id, &timeline_id, &TimelineId::generate());
|
||||||
|
Some(Tracer::new(path))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
// Check that the timeline exists
|
// Check that the timeline exists
|
||||||
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
let timeline = get_local_timeline(tenant_id, timeline_id)?;
|
||||||
|
|
||||||
@@ -446,15 +508,24 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let copy_data_bytes = match msg? {
|
let copy_data_bytes = match msg? {
|
||||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||||
|
Some(FeMessage::Sync) => {
|
||||||
|
// TODO what now?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Some(m) => {
|
Some(m) => {
|
||||||
bail!("unexpected message: {m:?} during COPY");
|
bail!("unexpected message: {m:?} during COPY");
|
||||||
}
|
}
|
||||||
None => break, // client disconnected
|
None => break, // client disconnected
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Trace request if needed
|
||||||
|
if let Some(t) = tracer.as_mut() {
|
||||||
|
t.trace(©_data_bytes)
|
||||||
|
}
|
||||||
|
|
||||||
trace!("query: {copy_data_bytes:?}");
|
trace!("query: {copy_data_bytes:?}");
|
||||||
|
|
||||||
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
|
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||||
|
|
||||||
let response = match neon_fe_msg {
|
let response = match neon_fe_msg {
|
||||||
PagestreamFeMessage::Exists(req) => {
|
PagestreamFeMessage::Exists(req) => {
|
||||||
|
|||||||
@@ -593,6 +593,14 @@ impl Tenant {
|
|||||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO is there a need for all this boilerplate?
|
||||||
|
pub fn get_trace_read_requests(&self) -> bool {
|
||||||
|
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||||
|
tenant_conf
|
||||||
|
.trace_read_requests
|
||||||
|
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||||
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -82,6 +82,10 @@ pub struct TenantConf {
|
|||||||
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
|
||||||
/// to avoid eager reconnects.
|
/// to avoid eager reconnects.
|
||||||
pub max_lsn_wal_lag: NonZeroU64,
|
pub max_lsn_wal_lag: NonZeroU64,
|
||||||
|
/// If enabled, records all read requests for this ... TODO
|
||||||
|
/// Even though read traces are small, there's no delete meachanism so they can
|
||||||
|
/// pile up. So we disable this by default.
|
||||||
|
pub trace_read_requests: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Same as TenantConf, but this struct preserves the information about
|
/// Same as TenantConf, but this struct preserves the information about
|
||||||
@@ -105,6 +109,7 @@ pub struct TenantConfOpt {
|
|||||||
#[serde(with = "humantime_serde")]
|
#[serde(with = "humantime_serde")]
|
||||||
pub lagging_wal_timeout: Option<Duration>,
|
pub lagging_wal_timeout: Option<Duration>,
|
||||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||||
|
pub trace_read_requests: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TenantConfOpt {
|
impl TenantConfOpt {
|
||||||
@@ -138,6 +143,9 @@ impl TenantConfOpt {
|
|||||||
.lagging_wal_timeout
|
.lagging_wal_timeout
|
||||||
.unwrap_or(global_conf.lagging_wal_timeout),
|
.unwrap_or(global_conf.lagging_wal_timeout),
|
||||||
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
|
||||||
|
trace_read_requests: self
|
||||||
|
.trace_read_requests
|
||||||
|
.unwrap_or(global_conf.trace_read_requests),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,6 +215,7 @@ impl TenantConf {
|
|||||||
.expect("cannot parse default walreceiver lagging wal timeout"),
|
.expect("cannot parse default walreceiver lagging wal timeout"),
|
||||||
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
.expect("cannot parse default max walreceiver Lsn wal lag"),
|
||||||
|
trace_read_requests: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,6 +241,7 @@ impl TenantConf {
|
|||||||
.unwrap(),
|
.unwrap(),
|
||||||
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
|
trace_read_requests: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
36
pageserver/src/trace.rs
Normal file
36
pageserver/src/trace.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use bytes::Bytes;
|
||||||
|
use std::{
|
||||||
|
fs::{create_dir_all, File},
|
||||||
|
io::{BufWriter, Write},
|
||||||
|
path::PathBuf,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Tracer {
|
||||||
|
writer: BufWriter<File>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Tracer {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tracer {
|
||||||
|
pub fn new(path: PathBuf) -> Self {
|
||||||
|
let parent = path.parent().expect("failed to parse parent path");
|
||||||
|
create_dir_all(parent).expect("failed to create trace dir");
|
||||||
|
|
||||||
|
let file = File::create(path).expect("failed to create trace file");
|
||||||
|
Tracer {
|
||||||
|
writer: BufWriter::new(file),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn trace(&mut self, msg: &Bytes) {
|
||||||
|
self.writer.write(msg).expect("failed to write trace");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush(&mut self) {
|
||||||
|
self.writer.flush().expect("failed to flush trace file");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -264,12 +264,76 @@ pageserver_flush(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Entry of the page_cache
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
// HACK Just xor of bytes lol
|
||||||
|
char request_hash;
|
||||||
|
|
||||||
|
// Points directly to a NeonResponse. We can't just own the
|
||||||
|
// NeonResponse because it's a "supertype", so it's not Sized.
|
||||||
|
char* response;
|
||||||
|
int len;
|
||||||
|
} NeonRequestResponse;
|
||||||
|
|
||||||
|
#define MAX_PAGE_CACHE_SIZE 50
|
||||||
|
NeonRequestResponse page_cache[MAX_PAGE_CACHE_SIZE];
|
||||||
|
int page_cache_size = 0;
|
||||||
|
int page_cache_head = 0;
|
||||||
|
|
||||||
static NeonResponse *
|
static NeonResponse *
|
||||||
pageserver_call(NeonRequest * request)
|
pageserver_call(NeonRequest * request)
|
||||||
{
|
{
|
||||||
|
// Compute hash
|
||||||
|
char hash = 0;
|
||||||
|
StringInfoData req_buff;
|
||||||
|
req_buff = nm_pack_request(request);
|
||||||
|
for (int i = 0; i < req_buff.len; i++) {
|
||||||
|
hash ^= req_buff.data[i];
|
||||||
|
}
|
||||||
|
pfree(req_buff.data);
|
||||||
|
|
||||||
|
// If result is cached, memcpy and return
|
||||||
|
for (int i = 0; i < page_cache_size; i++) {
|
||||||
|
if (page_cache[i].request_hash == hash) {
|
||||||
|
int len = page_cache[0].response->len;
|
||||||
|
NeonResponse *resp = palloc0(len);
|
||||||
|
// I'd rather Rc than memcpy, but this is not rust :(
|
||||||
|
memcpy(resp, page_cache[0].response->data, len);
|
||||||
|
elog(LOG, "cache hit !!!");
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send request, get response
|
||||||
pageserver_send(request);
|
pageserver_send(request);
|
||||||
pageserver_flush();
|
pageserver_flush();
|
||||||
return pageserver_receive();
|
NeonResponse *resp = pageserver_receive();
|
||||||
|
|
||||||
|
// Get length
|
||||||
|
int len = -1;
|
||||||
|
switch (resp->tag) {
|
||||||
|
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
|
||||||
|
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
|
||||||
|
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
|
||||||
|
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
|
||||||
|
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache result
|
||||||
|
page_cache[page_cache_head].request_hash = hash;
|
||||||
|
if (page_cache_head < page_cache_size) {
|
||||||
|
pfree(page_cache[page_cache_head].response);
|
||||||
|
}
|
||||||
|
page_cache[page_cache_head].len = len;
|
||||||
|
page_cache[page_cache_head].response = MemoryContextAlloc(TopMemoryContext, len);
|
||||||
|
memcpy(page_cache[page_cache_head].response, resp, len);
|
||||||
|
page_cache_head = (page_cache_head + 1) % MAX_PAGE_CACHE_SIZE;
|
||||||
|
if (page_cache_size < MAX_PAGE_CACHE_SIZE) {
|
||||||
|
page_cache_size += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
page_server_api api = {
|
page_server_api api = {
|
||||||
|
|||||||
@@ -91,10 +91,25 @@ class NeonCompare(PgCompare):
|
|||||||
self._pg_bin = pg_bin
|
self._pg_bin = pg_bin
|
||||||
self.pageserver_http_client = self.env.pageserver.http_client()
|
self.pageserver_http_client = self.env.pageserver.http_client()
|
||||||
|
|
||||||
|
self.tenant, _ = self.env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
"trace_read_requests": "true",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# HACK enable request tracing, as an experiment
|
||||||
|
# NOTE This must be done before the pg starts pagestream
|
||||||
|
# TODO why does it not work?
|
||||||
|
# self.env.neon_cli.config_tenant(self.env.initial_tenant, conf={
|
||||||
|
# "trace_read_requests": "true",
|
||||||
|
# })
|
||||||
|
|
||||||
# We only use one branch and one timeline
|
# We only use one branch and one timeline
|
||||||
self.env.neon_cli.create_branch(branch_name, "empty")
|
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
|
||||||
self._pg = self.env.postgres.create_start(branch_name)
|
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||||
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
|
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||||
|
self._pg = self.env.postgres.create_start(
|
||||||
|
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pg(self):
|
def pg(self):
|
||||||
@@ -109,10 +124,10 @@ class NeonCompare(PgCompare):
|
|||||||
return self._pg_bin
|
return self._pg_bin
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
self.pageserver_http_client.timeline_gc(self.env.initial_tenant, self.timeline, 0)
|
self.pageserver_http_client.timeline_gc(self.tenant, self.timeline, 0)
|
||||||
|
|
||||||
def compact(self):
|
def compact(self):
|
||||||
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
|
self.pageserver_http_client.timeline_compact(self.tenant, self.timeline)
|
||||||
|
|
||||||
def report_peak_memory_use(self) -> None:
|
def report_peak_memory_use(self) -> None:
|
||||||
self.zenbenchmark.record(
|
self.zenbenchmark.record(
|
||||||
@@ -124,7 +139,7 @@ class NeonCompare(PgCompare):
|
|||||||
|
|
||||||
def report_size(self) -> None:
|
def report_size(self) -> None:
|
||||||
timeline_size = self.zenbenchmark.get_timeline_size(
|
timeline_size = self.zenbenchmark.get_timeline_size(
|
||||||
self.env.repo_dir, self.env.initial_tenant, self.timeline
|
self.env.repo_dir, self.tenant, self.timeline
|
||||||
)
|
)
|
||||||
self.zenbenchmark.record(
|
self.zenbenchmark.record(
|
||||||
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
|
||||||
|
|||||||
@@ -1749,6 +1749,37 @@ def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
|
|||||||
return PgBin(test_output_dir, pg_version)
|
return PgBin(test_output_dir, pg_version)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ReplayBin:
|
||||||
|
"""A helper class for replaying pageserver wal and read traces."""
|
||||||
|
|
||||||
|
traces_dir: str
|
||||||
|
|
||||||
|
def replay_all(self, pageserver_connstr):
|
||||||
|
replay_binpath = os.path.join(str(neon_binpath), "replay")
|
||||||
|
args = [
|
||||||
|
replay_binpath,
|
||||||
|
self.traces_dir,
|
||||||
|
pageserver_connstr,
|
||||||
|
]
|
||||||
|
# return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||||
|
subprocess.run(args)
|
||||||
|
|
||||||
|
def draw_all(self):
|
||||||
|
draw_binpath = os.path.join(str(neon_binpath), "draw_trace")
|
||||||
|
args = [
|
||||||
|
draw_binpath,
|
||||||
|
self.traces_dir,
|
||||||
|
]
|
||||||
|
subprocess.run(args)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def replay_bin(test_output_dir):
|
||||||
|
traces_dir = os.path.join(test_output_dir, "repo", "traces")
|
||||||
|
return ReplayBin(traces_dir)
|
||||||
|
|
||||||
|
|
||||||
class VanillaPostgres(PgProtocol):
|
class VanillaPostgres(PgProtocol):
|
||||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
|
||||||
super().__init__(host="localhost", port=port, dbname="postgres")
|
super().__init__(host="localhost", port=port, dbname="postgres")
|
||||||
|
|||||||
@@ -175,6 +175,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
|
|||||||
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("scale", get_scales_matrix())
|
||||||
|
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||||
|
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
|
||||||
|
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
|
||||||
|
|
||||||
# Run the pgbench tests, and generate a flamegraph from it
|
# Run the pgbench tests, and generate a flamegraph from it
|
||||||
# This requires that the pageserver was built with the 'profiling' feature.
|
# This requires that the pageserver was built with the 'profiling' feature.
|
||||||
#
|
#
|
||||||
|
|||||||
57
test_runner/performance/test_trace_replay.py
Normal file
57
test_runner/performance/test_trace_replay.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
|
||||||
|
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||||
|
from fixtures.neon_fixtures import NeonEnvBuilder, ReplayBin
|
||||||
|
|
||||||
|
|
||||||
|
# This test is a demonstration of how to do trace playback. With the current
|
||||||
|
# workload it uses it's not testing anything meaningful.
|
||||||
|
def test_trace_replay(
|
||||||
|
neon_env_builder: NeonEnvBuilder, replay_bin: ReplayBin, zenbenchmark: NeonBenchmarker
|
||||||
|
):
|
||||||
|
neon_env_builder.num_safekeepers = 1
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
tenant, _ = env.neon_cli.create_tenant(
|
||||||
|
conf={
|
||||||
|
"trace_read_requests": "true",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||||
|
# env.neon_cli.config_tenant(tenant, conf={
|
||||||
|
# "trace_read_requests": "true",
|
||||||
|
# })
|
||||||
|
env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
|
||||||
|
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
|
||||||
|
|
||||||
|
with zenbenchmark.record_duration("run"):
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
pg.safe_psql("select 1;")
|
||||||
|
|
||||||
|
with closing(pg.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("create table t (i integer);")
|
||||||
|
cur.execute(f"insert into t values (generate_series(1,{10000}));")
|
||||||
|
cur.execute("select count(*) from t;")
|
||||||
|
|
||||||
|
# Stop pg so we drop the connection and flush the traces
|
||||||
|
pg.stop()
|
||||||
|
|
||||||
|
# TODO This doesn't work because I haven't updated tenant_config_handler
|
||||||
|
# env.neon_cli.config_tenant(tenant, conf={
|
||||||
|
# "trace_read_requests": "false",
|
||||||
|
# })
|
||||||
|
|
||||||
|
# trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline) / str(timeline)
|
||||||
|
# assert trace_path.exists()
|
||||||
|
|
||||||
|
replay_bin.draw_all()
|
||||||
|
return
|
||||||
|
|
||||||
|
print("replaying")
|
||||||
|
ps_connstr = env.pageserver.connstr()
|
||||||
|
with zenbenchmark.record_duration("replay"):
|
||||||
|
output = replay_bin.replay_all(ps_connstr)
|
||||||
|
print(output)
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
|
|
||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
|
import pytest
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user