From 531561497453e51bc68ad2e5c2f2943f57db8ef1 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 5 Oct 2022 13:21:42 -0400 Subject: [PATCH] WIP --- control_plane/src/storage.rs | 6 +- pageserver/src/bin/draw_trace.rs | 96 ++++++++++++++++---- pageserver/src/page_service.rs | 2 +- pgxn/neon/libpagestore.c | 61 ++++++++++++- test_runner/fixtures/compare_fixtures.py | 3 +- test_runner/performance/test_perf_pgbench.py | 5 + 6 files changed, 152 insertions(+), 21 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 6ef85319b2..6c79860404 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -176,9 +176,11 @@ impl PageServerNode { new_tenant_id: Option, new_timeline_id: Option, ) -> anyhow::Result { - let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?; + let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new()) + .context("failed to create tenant")?; let initial_timeline_info = - self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?; + self.timeline_create(initial_tenant_id, new_timeline_id, None, None) + .context("failed to create timeline")?; Ok(initial_timeline_info.timeline_id) } diff --git a/pageserver/src/bin/draw_trace.rs b/pageserver/src/bin/draw_trace.rs index 0e0453a4a3..d7d822c32d 100644 --- a/pageserver/src/bin/draw_trace.rs +++ b/pageserver/src/bin/draw_trace.rs @@ -1,10 +1,12 @@ use clap::{App, Arg}; +use futures::TryFutureExt; use pageserver::{page_service::PagestreamFeMessage, repository::Key}; + +use std::{collections::{BTreeMap, BTreeSet, HashMap}, ops::Range, path::PathBuf}; use std::io::Write; use std::{ fs::{read_dir, File}, io::BufReader, - path::PathBuf, str::FromStr, }; use svg_fmt::*; @@ -14,6 +16,17 @@ use utils::{ pq_proto::{BeMessage, FeMessage}, }; +fn analyze(coords: Vec) -> (usize, BTreeMap) { + let set: BTreeSet = coords.into_iter().collect(); + + let mut map: BTreeMap = BTreeMap::new(); + for (i, e) in set.iter().enumerate() { + map.insert(*e, i); + } + + (set.len(), map) +} + fn main() -> anyhow::Result<()> { // TODO upgrade to struct macro arg parsing let arg_matches = App::new("Pageserver trace visualization tool") @@ -28,6 +41,13 @@ fn main() -> anyhow::Result<()> { // (blkno, lsn) let mut dots = Vec::<(u32, Lsn)>::new(); + + let mut dump_file = File::create("dump.txt").expect("can't make file"); + let mut deltas = HashMap::::new(); + let mut prev1: u32 = 0; + let mut prev2: u32 = 0; + let mut prev3: u32 = 0; + println!("scanning trace ..."); let traces_dir = PathBuf::from(arg_matches.value_of("traces_dir").unwrap()); for tenant_dir in read_dir(traces_dir)? { @@ -58,24 +78,66 @@ fn main() -> anyhow::Result<()> { match msg { PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Nblocks(_) => {} - PagestreamFeMessage::GetPage(req) => dots.push((req.blkno, req.lsn)), + PagestreamFeMessage::GetPage(req) => { + writeln!(&mut dump_file, "{} {} {}", req.rel, req.blkno, req.lsn)?; + // dots.push((req.blkno, req.lsn)); + // HACK + dots.push((req.blkno, Lsn::from(dots.len() as u64))); + + let delta1 = (req.blkno as i32) - (prev1 as i32); + let delta2 = (req.blkno as i32) - (prev2 as i32); + let delta3 = (req.blkno as i32) - (prev3 as i32); + let mut delta = if i32::abs(delta1) < i32::abs(delta2) { + delta1 + } else { + delta2 + }; + if i32::abs(delta3) < i32::abs(delta) { + delta = delta3; + } + + prev3 = prev2; + prev2 = prev1; + prev1 = req.blkno; + + match deltas.get_mut(&delta) { + Some(c) => {*c += 1;}, + None => {deltas.insert(delta, 1);}, + }; + + if delta == 9 { + println!("{} {} {} {}", dots.len(), req.rel, req.blkno, req.lsn); + } + }, PagestreamFeMessage::DbSize(_) => {} }; // HACK - if dots.len() > 100 { - break; - } + // if dots.len() > 1000 { + // break; + // } } } } } + let mut other = deltas.len(); + deltas.retain(|_, count| *count > 3); + other -= deltas.len(); + dbg!(other); + dbg!(deltas); + + // Collect all coordinates + let mut keys: Vec = vec![]; + let mut lsns: Vec = vec![]; + for dot in &dots { + keys.push(dot.0); + lsns.push(dot.1); + } + // Analyze - let blkno_max = (&dots).into_iter().map(|(blkno, _)| blkno).max().unwrap(); - let blkno_min = (&dots).into_iter().map(|(blkno, _)| blkno).min().unwrap(); - let lsn_max = (&dots).into_iter().map(|(_, lsn)| lsn).max().unwrap(); - let lsn_min = (&dots).into_iter().map(|(_, lsn)| lsn).min().unwrap(); + let (key_max, key_map) = analyze(keys); + let (lsn_max, lsn_map) = analyze(lsns); // Draw println!("drawing trace ..."); @@ -84,19 +146,21 @@ fn main() -> anyhow::Result<()> { &mut svg_file, "{}", BeginSvg { - w: (blkno_max - blkno_min + 1) as f32, - h: (lsn_max.0 - lsn_min.0 + 1) as f32, + w: (key_max + 1) as f32, + h: (lsn_max + 1) as f32, } )?; - for dot in &dots { + for (key, lsn) in &dots { + let key = key_map.get(&key).unwrap(); + let lsn = lsn_map.get(&lsn).unwrap(); writeln!( &mut svg_file, " {}", rectangle( - (dot.0 - blkno_min) as f32, - (dot.1 .0 - lsn_min.0) as f32, - 1.0, - 1.0 + *key as f32, + *lsn as f32, + 10.0, + 10.0 ) .fill(Fill::Color(red())) .stroke(Stroke::Color(black(), 0.0)) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cbb13d3cc4..b0b26b4f44 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -88,7 +88,7 @@ pub struct PagestreamNblocksRequest { pub struct PagestreamGetPageRequest { latest: bool, pub lsn: Lsn, - rel: RelTag, + pub rel: RelTag, pub blkno: u32, } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 296865838d..3f74fd7234 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -264,12 +264,71 @@ pageserver_flush(void) } } +// Entry of the page_cache +typedef struct +{ + // HACK Just xor of bytes lol + char request_hash; + + // Points directly to a NeonResponse. We can't just own the + // NeonResponse because it's a "supertype", so it's not Sized. + StringInfo response; +} NeonRequestResponse; + +NeonRequestResponse page_cache[20]; +int page_cache_size = 0; +int page_cache_head = 0; + static NeonResponse * pageserver_call(NeonRequest * request) { + // Compute hash + char hash = 0; + StringInfoData req_buff; + req_buff = zm_pack_request(request); + for (int i = 0; i < req_buff.len; i++) { + hash ^= req_buff.data[i]; + } + pfree(req_buff.data); + + // If result is cached, memcpy and return + // for (int i = 0; i < page_cache_size; i++) { + // if (page_cache[i].request_hash == hash) { + // int len = page_cache[0].response->len; + // NeonResponse *resp = palloc0(len); + // // I'd rather Rc than memcpy, but this is not rust :( + // memcpy(resp, page_cache[0].response->data, len); + // return resp; + // } + // } + + // Send request, get response pageserver_send(request); pageserver_flush(); - return pageserver_receive(); + NeonResponse *resp = pageserver_receive(); + + // Get length + int len = -1; + switch (resp->tag) { + case T_NeonExistsResponse: { len = sizeof(NeonExistsResponse); } + case T_NeonNblocksResponse: { len = sizeof(NeonNblocksResponse); } + case T_NeonGetPageResponse: { len = offsetof(NeonGetPageResponse, page) + BLCKSZ; } + case T_NeonDbSizeResponse: { len = sizeof(NeonDbSizeResponse); } + case T_NeonErrorResponse: { len = sizeof(NeonErrorResponse); } + } + + // Cache result + // page_cache[page_cache_head].request_hash = hash; + // page_cache[page_cache_head].response->len = len; + // // TODO free old result + // page_cache[page_cache_head].response->data = palloc0(len); + // memcpy(page_cache[page_cache_head].response->data, resp, len); + // page_cache_head = (page_cache_head + 1) % 20; + // if (page_cache_size < 20) { + // page_cache_size += 1; + // } + + return resp; } page_server_api api = { diff --git a/test_runner/fixtures/compare_fixtures.py b/test_runner/fixtures/compare_fixtures.py index ca4786143b..b61fd2d975 100644 --- a/test_runner/fixtures/compare_fixtures.py +++ b/test_runner/fixtures/compare_fixtures.py @@ -107,7 +107,8 @@ class NeonCompare(PgCompare): # self.env.neon_cli.create_branch(branch_name, "empty", tenant_id=self.tenant) # self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0] self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant) - self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant) + self._pg = self.env.postgres.create_start( + branch_name, "main", self.tenant, config_lines=["shared_buffers=2GB"]) # Long-lived cursor, useful for flushing self.psconn = self.env.pageserver.connect() diff --git a/test_runner/performance/test_perf_pgbench.py b/test_runner/performance/test_perf_pgbench.py index 2a2213b783..d7aa1911b9 100644 --- a/test_runner/performance/test_perf_pgbench.py +++ b/test_runner/performance/test_perf_pgbench.py @@ -164,6 +164,11 @@ def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int): run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY) +@pytest.mark.parametrize("scale", get_scales_matrix()) +@pytest.mark.parametrize("duration", get_durations_matrix()) +def test_pgbench_init(neon_with_baseline: PgCompare, scale: int, duration: int): + run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT) + # Run the pgbench tests, and generate a flamegraph from it # This requires that the pageserver was built with the 'profiling' feature. #