This commit is contained in:
Bojan Serafimov
2022-10-05 13:21:42 -04:00
parent 293bc69e4f
commit 5315614974
6 changed files with 152 additions and 21 deletions

View File

@@ -176,9 +176,11 @@ impl PageServerNode {
new_tenant_id: Option<TenantId>,
new_timeline_id: Option<TimelineId>,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())
.context("failed to create tenant")?;
let initial_timeline_info =
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?;
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)
.context("failed to create timeline")?;
Ok(initial_timeline_info.timeline_id)
}

View File

@@ -1,10 +1,12 @@
use clap::{App, Arg};
use futures::TryFutureExt;
use pageserver::{page_service::PagestreamFeMessage, repository::Key};
use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf};
use std::io::Write;
use std::{
fs::{read_dir, File},
io::BufReader,
path::PathBuf,
str::FromStr,
};
use svg_fmt::*;
@@ -14,6 +16,17 @@ use utils::{
pq_proto::{BeMessage, FeMessage},
};
fn analyze<T: Ord + Copy>(coords: Vec<T>) -> (usize, BTreeMap<T, usize>) {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
(set.len(), map)
}
fn main() -> anyhow::Result<()> {
// TODO upgrade to struct macro arg parsing
let arg_matches = App::new("Pageserver trace visualization tool")
@@ -28,6 +41,13 @@ fn main() -> anyhow::Result<()> {
// (blkno, lsn)
let mut dots = Vec::<(u32, Lsn)>::new();
let mut dump_file = File::create("dump.txt").expect("can't make file");
let mut deltas = HashMap::<i32, u32>::new();
let mut prev1: u32 = 0;
let mut prev2: u32 = 0;
let mut prev3: u32 = 0;
println!("scanning trace ...");
let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap());
for tenant_dir in read_dir(traces_dir)? {
@@ -58,24 +78,66 @@ fn main() -> anyhow::Result<()> {
match msg {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetPage(req) => dots.push((req.blkno, req.lsn)),
PagestreamFeMessage::GetPage(req) => {
writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?;
// dots.push((req.blkno, req.lsn));
// HACK
dots.push((req.blkno, Lsn::from(dots.len() as u64)));
let delta1 = (req.blkno as i32) - (prev1 as i32);
let delta2 = (req.blkno as i32) - (prev2 as i32);
let delta3 = (req.blkno as i32) - (prev3 as i32);
let mut delta = if i32::abs(delta1) < i32::abs(delta2) {
delta1
} else {
delta2
};
if i32::abs(delta3) < i32::abs(delta) {
delta = delta3;
}
prev3 = prev2;
prev2 = prev1;
prev1 = req.blkno;
match deltas.get_mut(&delta) {
Some(c) => {*c += 1;},
None => {deltas.insert(delta, 1);},
};
if delta == 9 {
println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn);
}
},
PagestreamFeMessage::DbSize(_) => {}
};
// HACK
if dots.len() > 100 {
break;
}
// if dots.len() > 1000 {
// break;
// }
}
}
}
}
let mut other = deltas.len();
deltas.retain(|_, count| *count > 3);
other -= deltas.len();
dbg!(other);
dbg!(deltas);
// Collect all coordinates
let mut keys: Vec<u32> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for dot in &dots {
keys.push(dot.0);
lsns.push(dot.1);
}
// Analyze
let blkno_max = (&dots).into_iter().map(|(blkno, _)| blkno).max().unwrap();
let blkno_min = (&dots).into_iter().map(|(blkno, _)| blkno).min().unwrap();
let lsn_max = (&dots).into_iter().map(|(_, lsn)| lsn).max().unwrap();
let lsn_min = (&dots).into_iter().map(|(_, lsn)| lsn).min().unwrap();
let (key_max, key_map) = analyze(keys);
let (lsn_max, lsn_map) = analyze(lsns);
// Draw
println!("drawing trace ...");
@@ -84,19 +146,21 @@ fn main() -> anyhow::Result<()> {
&mut svg_file,
"{}",
BeginSvg {
w: (blkno_max - blkno_min + 1) as f32,
h: (lsn_max.0 - lsn_min.0 + 1) as f32,
w: (key_max + 1) as f32,
h: (lsn_max + 1) as f32,
}
)?;
for dot in &dots {
for (key, lsn) in &dots {
let key = key_map.get(&key).unwrap();
let lsn = lsn_map.get(&lsn).unwrap();
writeln!(
&mut svg_file,
" {}",
rectangle(
(dot.0 - blkno_min) as f32,
(dot.1 .0 - lsn_min.0) as f32,
1.0,
1.0
*key as f32,
*lsn as f32,
10.0,
10.0
)
.fill(Fill::Color(red()))
.stroke(Stroke::Color(black(), 0.0))

View File

@@ -88,7 +88,7 @@ pub struct PagestreamNblocksRequest {
pub struct PagestreamGetPageRequest {
latest: bool,
pub lsn: Lsn,
rel: RelTag,
pub rel: RelTag,
pub blkno: u32,
}

View File

@@ -264,12 +264,71 @@ pageserver_flush(void)
}
}
// Entry of the page_cache
typedef struct
{
// HACK Just xor of bytes lol
char request_hash;
// Points directly to a NeonResponse. We can't just own the
// NeonResponse because it's a "supertype", so it's not Sized.
StringInfo response;
} NeonRequestResponse;
NeonRequestResponse page_cache[20];
int page_cache_size = 0;
int page_cache_head = 0;
static NeonResponse *
pageserver_call(NeonRequest * request)
{
// Compute hash
char hash = 0;
StringInfoData req_buff;
req_buff = zm_pack_request(request);
for (int i = 0; i < req_buff.len; i++) {
hash ^= req_buff.data[i];
}
pfree(req_buff.data);
// If result is cached, memcpy and return
// for (int i = 0; i < page_cache_size; i++) {
// if (page_cache[i].request_hash == hash) {
// int len = page_cache[0].response->len;
// NeonResponse *resp = palloc0(len);
// // I'd rather Rc than memcpy, but this is not rust :(
// memcpy(resp, page_cache[0].response->data, len);
// return resp;
// }
// }
// Send request, get response
pageserver_send(request);
pageserver_flush();
return pageserver_receive();
NeonResponse *resp = pageserver_receive();
// Get length
int len = -1;
switch (resp->tag) {
case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); }
case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); }
case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; }
case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); }
case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); }
}
// Cache result
// page_cache[page_cache_head].request_hash = hash;
// page_cache[page_cache_head].response->len = len;
// // TODO free old result
// page_cache[page_cache_head].response->data = palloc0(len);
// memcpy(page_cache[page_cache_head].response->data, resp, len);
// page_cache_head = (page_cache_head + 1) % 20;
// if (page_cache_size < 20) {
// page_cache_size += 1;
// }
return resp;
}
page_server_api api = {

View File

@@ -107,7 +107,8 @@ class NeonCompare(PgCompare):
# self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant)
# self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant)
self._pg = self.env.postgres.create_start(
branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"])
# Long-lived cursor, useful for flushing
self.psconn = self.env.pageserver.connect()

View File

@@ -164,6 +164,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
@pytest.mark.parametrize("scale", get_scales_matrix())
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
# Run the pgbench tests, and generate a flamegraph from it
# This requires that the pageserver was built with the 'profiling' feature.
#