From fbc4206f2d9742d6f561218846cb240e36eb63e3 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Mon, 11 Apr 2022 21:50:50 -0400 Subject: [PATCH] Simplify --- pageserver/src/bin/pageserver.rs | 3 + pageserver/src/bin/psbench.rs | 258 ++------------------- pageserver/src/lib.rs | 1 + pageserver/src/wal_metadata.rs | 78 +++++++ pageserver/src/walingest.rs | 9 + test_runner/fixtures/zenith_fixtures.py | 5 +- test_runner/performance/test_pageserver.py | 2 +- 7 files changed, 116 insertions(+), 240 deletions(-) create mode 100644 pageserver/src/wal_metadata.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d37ba0cece..ed5d041e08 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -184,6 +184,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() // Initialize logger let log_file = logging::init(LOG_FILE_NAME, daemonize)?; + // TODO init only if configured + pageserver::wal_metadata::init(conf).expect("wal_metadata init failed"); + info!("version: {}", GIT_VERSION); // TODO: Check that it looks like a valid repository before going further diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs index 8d0921c07d..a4aea28f9c 100644 --- a/pageserver/src/bin/psbench.rs +++ b/pageserver/src/bin/psbench.rs @@ -4,14 +4,14 @@ //! of the tester matters, and the API is easier to work with from rust. use std::{collections::{HashMap, HashSet}, io::{BufRead, BufReader, Cursor}, net::SocketAddr, ops::AddAssign, time::Duration}; use byteorder::ReadBytesExt; +use itertools::Itertools; +use pageserver::wal_metadata::{Page, WalEntryMetadata}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use bytes::{BufMut, Bytes, BytesMut}; use clap::{App, Arg}; use std::fs::File; -use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}}; +use zenith_utils::{GIT_VERSION, lsn::Lsn, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}}; use std::time::Instant; -use plotly::{Histogram, Layout, Plot, Scatter}; -use plotly::common::Mode; use anyhow::Result; @@ -79,131 +79,39 @@ pub async fn get_page( Ok(page) } -#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)] -pub struct Lsn(pub u64); - -#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)] -pub struct Page { - spcnode: u32, - dbnode: u32, - relnode: u32, - forknum: u8, - blkno: u32, -} - -impl Page { - async fn read(buf: &mut Reader) -> Result - where - Reader: tokio::io::AsyncRead + Unpin, - { - let spcnode = buf.read_u32().await?; - let dbnode = buf.read_u32().await?; - let relnode = buf.read_u32().await?; - let forknum = buf.read_u8().await?; - let blkno = buf.read_u32().await?; - Ok(Page { spcnode, dbnode, relnode, forknum, blkno }) - } - - async fn write(&self, buf: &mut BytesMut) -> Result<()> { - buf.put_u32(self.spcnode); - buf.put_u32(self.dbnode); - buf.put_u32(self.relnode); - buf.put_u8(self.forknum); - buf.put_u32(self.blkno); - Ok(()) - } -} - #[tokio::main] async fn main() -> Result<()> { let arg_matches = App::new("LALALA") .about("lalala") .version(GIT_VERSION) .arg( - Arg::new("path") - .help("Path to file to dump") + Arg::new("wal_metadata_file") + .help("Path to wal metadata file") .required(true) .index(1), ) - .arg( - Arg::new("ps_connstr") - .help("Connection string to pageserver") - .required(true) - .index(2), - ) .arg( Arg::new("tenant_hex") .help("TODO") .required(true) - .index(3), + .index(2), ) .arg( Arg::new("timeline") .help("TODO") .required(true) - .index(4), + .index(3), ) .get_matches(); - let log_file = arg_matches.value_of("path").unwrap(); - let ps_connstr = arg_matches.value_of("ps_connstr").unwrap(); + let metadata_file = arg_matches.value_of("wal_metadata_file").unwrap(); let tenant_hex = arg_matches.value_of("tenant_hex").unwrap(); let timeline = arg_matches.value_of("timeline").unwrap(); // Parse log lines - let relevant = read_lines_buffered(log_file).filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string())); - let mut wal_entry_sizes = HashMap::::new(); - let mut lsn_page_pairs = Vec::<(Lsn, Page)>::new(); - for line in relevant { - let (lsn, rest) = line.split_once(" ").unwrap(); - let (page, size) = rest.split_once(" ").unwrap(); - - let lsn = hex::decode(lsn)?; - let lsn = Lsn(AsyncReadExt::read_u64(&mut Cursor::new(lsn)).await?); - - let page = hex::decode(page)?; - let page = Page::read(&mut Cursor::new(page)).await?; - - let size: usize = size.parse().unwrap(); - - lsn_page_pairs.push((lsn, page)); - wal_entry_sizes.insert(lsn, size); - } - lsn_page_pairs.sort(); - - // Organize write info - let mut writes_per_entry = HashMap::>::new(); - for (lsn, page) in lsn_page_pairs.clone() { - writes_per_entry.entry(lsn).or_insert(vec![]).push(page); - } - - // Print some stats - let mut updates_per_page = HashMap::::new(); - for (_, page) in lsn_page_pairs.clone() { - updates_per_page.entry(page).or_insert(0).add_assign(1); - } - let mut updates_per_page: Vec<(&usize, &Page)> = updates_per_page - .iter().map(|(k, v)| (v, k)).collect(); - updates_per_page.sort(); - updates_per_page.reverse(); - // dbg!(&updates_per_page); - dbg!(updates_per_page[0]); - - let updated_pages: HashSet<&&Page> = updates_per_page.iter().filter_map(|(count, page)| { - if **count > 1 { - Some(page) - } else { - None - } - }).collect(); - - let hottest_page = updates_per_page[0].1; - let first_update = lsn_page_pairs - .iter() - .filter(|(_lsn, page)| page == hottest_page) - .map(|(lsn, _page)| lsn) - .min() - .unwrap(); + let wal_metadata: Vec = read_lines_buffered(metadata_file) + .map(|line| serde_json::from_str(&line).expect("corrupt metadata file")) + .collect(); // Get raw TCP connection to the pageserver postgres protocol port let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?; @@ -222,152 +130,30 @@ async fn main() -> Result<()> { _ = client.query(init_query.as_str(), &[]) => (), }; - let entries = { - let mut entries: Vec<(Lsn, usize, Vec)> = vec![]; - for (lsn, page) in lsn_page_pairs.clone() { - if let Some(last) = entries.last_mut() { - if last.0 == lsn { - last.2.push(page) - } else { - entries.push((lsn, wal_entry_sizes[&lsn], vec![page])); - } - } else { - entries.push((lsn, wal_entry_sizes[&lsn], vec![page])); - } - } - entries - }; + // Derive some variables + let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum(); + let affected_pages: HashSet<_> = wal_metadata.iter().map(|m| m.affected_pages.clone()) + .flatten().collect(); + let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap(); + println!("Total pages: {}", affected_pages.len()); + println!("total wal: {}", wal_metadata.len()); + println!("total wal bytes: {}", total_wal_size); + // Get all latest pages let mut durations: Vec = vec![]; - let latest_lsn = entries.iter().map(|(lsn, _, _)| lsn).max().unwrap(); - for (_, page) in updates_per_page.clone() { + for page in affected_pages { let start = Instant::now(); - let _page_bytes = get_page(&mut socket, latest_lsn, page, true).await?; + let _page_bytes = get_page(&mut socket, &latest_lsn, &page, true).await?; let duration = start.elapsed(); durations.push(duration); } - println!("Total pages: {}", updates_per_page.len()); - durations.sort(); println!("Fastest: {:?}", durations.first().unwrap()); println!("Median: {:?}", durations[durations.len() / 2]); println!("99th percentile: {:?}", durations[durations.len() - 1 - durations.len() / 100]); println!("Slowest: {:?}", durations.last().unwrap()); - - return Ok(()); - let mut plot = Plot::new(); - plot.add_trace(Histogram::new(durations)); - plot.show(); - - return Ok(()); - - - let mut results: Vec<(Lsn, usize)> = vec![]; - let mut page_data = HashMap::>::new(); - let mut total_dirty_units = 0; - let mut total_size = 0; - for (lsn, size, pages) in entries { - // if !updated_pages.contains(&&page) { - // continue; - // } - for page in pages { - let page_bytes = get_page(&mut socket, &lsn, &page, false).await?; - - let dirty_units = if let Some(prev_bytes) = page_data.get(&page) { - let mut dirty_units = 0; - for unit_idx in 0..BYTES_IN_PAGE/64 { - let mut is_dirty = 0; - for offset in 0..64 { - let byte_idx = 64 * unit_idx + offset; - let xor = page_bytes[byte_idx] ^ prev_bytes[byte_idx]; - if xor > 0 { - is_dirty = 1; - } - } - dirty_units += is_dirty; - } - if dirty_units > BYTES_IN_PAGE/64 { - panic!("wtf") - } - dirty_units - } else { - BYTES_IN_PAGE/64 // TODO we can do better - }; - page_data.insert(page, page_bytes); - - total_dirty_units += dirty_units; - } - total_size += size; - } - - dbg!(total_dirty_units, total_size); - dbg!(((64 * total_dirty_units) as f64) / (total_size as f64)); - // Returned: - // - 0.22 using 0 for new page dirty estimate - return Ok(()); - - results.sort(); - let x: Vec<_> = results.iter().map(|(lsn, _)| lsn.0).collect(); - let z: Vec<_> = results.iter().map(|(_, modified)| modified.clone()).collect(); - - // let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines); - let modified_bits_histogram = Histogram::new(z).name("modbits"); - - let mut plot = Plot::new(); - // plot.add_trace(get_page_trace); - // plot.add_trace(modified_bits_trace); - plot.add_trace(modified_bits_histogram); - plot.show(); - - return Ok(()); - - // Do some warmup - let mut prev_page = get_page(&mut socket, &first_update, &hottest_page, false).await?; - - let mut results: Vec<(Lsn, (Duration, usize))> = vec![]; - for (i, (lsn, _pages)) in writes_per_entry.iter().enumerate() { - if lsn >= first_update { - - // Just to speed up things - // if i % 1000 != 0 { - // continue - // } - - // println!("Running get_page {:?} at {:?}", hottest_page, lsn); - let start = Instant::now(); - let page = get_page(&mut socket, &lsn, &hottest_page, false).await?; - let duration = start.elapsed(); - - // TODO why is most modified page constant? Is this test correct? - let modified_bits = { - let mut modified_bits = 0; - for byte_idx in 0..BYTES_IN_PAGE { - let xor = page[byte_idx] ^ prev_page[byte_idx]; - modified_bits = xor.count_ones() as usize; - } - modified_bits - }; - prev_page = page; - - results.push((lsn.clone(), (duration, modified_bits))); - // println!("Time: {:?}", duration); - } - } - results.sort(); - let x: Vec<_> = results.iter().map(|(lsn, _)| lsn.0).collect(); - let y: Vec<_> = results.iter().map(|(_, (duration, _))| duration.as_micros()).collect(); - let z: Vec<_> = results.iter().map(|(_, (_, modified))| modified.clone()).collect(); - - let get_page_trace = Scatter::new(x.clone(), y).name("get_page").mode(Mode::Lines); - let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines); - - let mut plot = Plot::new(); - // plot.add_trace(get_page_trace); - plot.add_trace(modified_bits_trace); - plot.show(); - Ok(()) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index fd38ba9d70..be8cc9122b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -19,6 +19,7 @@ pub mod walingest; pub mod walreceiver; pub mod walrecord; pub mod walredo; +pub mod wal_metadata; use lazy_static::lazy_static; use zenith_metrics::{register_int_gauge_vec, IntGaugeVec}; diff --git a/pageserver/src/wal_metadata.rs b/pageserver/src/wal_metadata.rs new file mode 100644 index 0000000000..0015c54b13 --- /dev/null +++ b/pageserver/src/wal_metadata.rs @@ -0,0 +1,78 @@ +use anyhow::Result; +use once_cell::sync::OnceCell; +use zenith_utils::lsn::Lsn; +use std::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use bytes::{BufMut, Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; + +use crate::{config::PageServerConf, repository::Key, walrecord::DecodedBkpBlock}; + +pub static WAL_METADATA_FILE: OnceCell = OnceCell::new(); + +#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub struct Page { + spcnode: u32, + dbnode: u32, + relnode: u32, + forknum: u8, + blkno: u32, +} + +impl Page { + pub async fn read(buf: &mut Reader) -> Result + where + Reader: tokio::io::AsyncRead + Unpin, + { + let spcnode = buf.read_u32().await?; + let dbnode = buf.read_u32().await?; + let relnode = buf.read_u32().await?; + let forknum = buf.read_u8().await?; + let blkno = buf.read_u32().await?; + Ok(Page { spcnode, dbnode, relnode, forknum, blkno }) + } + + pub async fn write(&self, buf: &mut BytesMut) -> Result<()> { + buf.put_u32(self.spcnode); + buf.put_u32(self.dbnode); + buf.put_u32(self.relnode); + buf.put_u8(self.forknum); + buf.put_u32(self.blkno); + Ok(()) + } +} + +impl From<&DecodedBkpBlock> for Page { + fn from(blk: &DecodedBkpBlock) -> Self { + Page { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + blkno: blk.blkno, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WalEntryMetadata { + pub lsn: Lsn, + pub size: usize, + pub affected_pages: Vec, +} + +pub fn init(conf: &'static PageServerConf) -> Result<()> { + let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log"); + WAL_METADATA_FILE.set(File::create(wal_metadata_file_dir)?) + .expect("wal_metadata file is already created"); + Ok(()) +} + +pub fn write(wal_meta: WalEntryMetadata) -> Result<()> { + if let Some(mut file) = WAL_METADATA_FILE.get() { + let mut line = serde_json::to_string(&wal_meta)?; + line.push('\n'); + std::io::prelude::Write::write_all(&mut file, line.as_bytes())?; + } + Ok(()) +} diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d5d9ef4da5..4b9d73a2b7 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -34,6 +34,7 @@ use std::collections::HashMap; use crate::pgdatadir_mapping::*; use crate::relish::*; use crate::repository::Repository; +use crate::wal_metadata::WalEntryMetadata; use crate::walrecord::*; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::xlog_utils::*; @@ -256,6 +257,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { hex::encode(bytes.freeze()) }; let page_hex = { + let foo: DecodedBkpBlock; use bytes::BufMut; let mut page = BytesMut::new(); page.put_u32(blk.rnode_spcnode); @@ -270,6 +272,13 @@ impl<'a, R: Repository> WalIngest<'a, R> { self.ingest_decoded_block(&mut writer, lsn, &decoded, blk)?; } + // Emit wal entry metadata, if configured to do so + crate::wal_metadata::write(WalEntryMetadata { + lsn, + size: recdata_len, + affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect() + }); + // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { let new_checkpoint_bytes = self.checkpoint.encode(); diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index afb129237b..f4749e07a3 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -614,11 +614,10 @@ class ZenithEnv: return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) def run_psbench(self, timeline): - ps_log_filename = os.path.join(self.repo_dir, "pageserver.log") - ps_connstr = self.pageserver.connstr() + wal_metadata_filename = os.path.join(self.repo_dir, "wal_metadata.log") psbench_binpath = os.path.join(str(zenith_binpath), 'psbench') tenant_hex = self.initial_tenant.hex - args = [psbench_binpath, ps_log_filename, ps_connstr, tenant_hex, timeline] + args = [psbench_binpath, wal_metadata_filename, tenant_hex, timeline] subprocess.run(args) @cached_property diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py index 6590cf0a9d..9628aa5067 100644 --- a/test_runner/performance/test_pageserver.py +++ b/test_runner/performance/test_pageserver.py @@ -18,7 +18,7 @@ def test_get_page(zenith_simple_env: ZenithEnv, with closing(pg.connect()) as conn: with conn.cursor() as cur: - workload = "pgbench long" + workload = "pgbench" print(f"Running workload {workload}") if workload == "hot page":