This commit is contained in:
Bojan Serafimov
2022-04-11 21:50:50 -04:00
parent d614291c44
commit fbc4206f2d
7 changed files with 116 additions and 240 deletions

View File

@@ -184,6 +184,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
// TODO init only if configured
pageserver::wal_metadata::init(conf).expect("wal_metadata init failed");
info!("version: {}", GIT_VERSION);
// TODO: Check that it looks like a valid repository before going further

View File

@@ -4,14 +4,14 @@
//! of the tester matters, and the API is easier to work with from rust.
use std::{collections::{HashMap, HashSet}, io::{BufRead, BufReader, Cursor}, net::SocketAddr, ops::AddAssign, time::Duration};
use byteorder::ReadBytesExt;
use itertools::Itertools;
use pageserver::wal_metadata::{Page, WalEntryMetadata};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use bytes::{BufMut, Bytes, BytesMut};
use clap::{App, Arg};
use std::fs::File;
use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}};
use zenith_utils::{GIT_VERSION, lsn::Lsn, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}};
use std::time::Instant;
use plotly::{Histogram, Layout, Plot, Scatter};
use plotly::common::Mode;
use anyhow::Result;
@@ -79,131 +79,39 @@ pub async fn get_page(
Ok(page)
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)]
pub struct Lsn(pub u64);
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)]
pub struct Page {
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: u32,
}
impl Page {
async fn read<Reader>(buf: &mut Reader) -> Result<Page>
where
Reader: tokio::io::AsyncRead + Unpin,
{
let spcnode = buf.read_u32().await?;
let dbnode = buf.read_u32().await?;
let relnode = buf.read_u32().await?;
let forknum = buf.read_u8().await?;
let blkno = buf.read_u32().await?;
Ok(Page { spcnode, dbnode, relnode, forknum, blkno })
}
async fn write(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u8(self.forknum);
buf.put_u32(self.blkno);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let arg_matches = App::new("LALALA")
.about("lalala")
.version(GIT_VERSION)
.arg(
Arg::new("path")
.help("Path to file to dump")
Arg::new("wal_metadata_file")
.help("Path to wal metadata file")
.required(true)
.index(1),
)
.arg(
Arg::new("ps_connstr")
.help("Connection string to pageserver")
.required(true)
.index(2),
)
.arg(
Arg::new("tenant_hex")
.help("TODO")
.required(true)
.index(3),
.index(2),
)
.arg(
Arg::new("timeline")
.help("TODO")
.required(true)
.index(4),
.index(3),
)
.get_matches();
let log_file = arg_matches.value_of("path").unwrap();
let ps_connstr = arg_matches.value_of("ps_connstr").unwrap();
let metadata_file = arg_matches.value_of("wal_metadata_file").unwrap();
let tenant_hex = arg_matches.value_of("tenant_hex").unwrap();
let timeline = arg_matches.value_of("timeline").unwrap();
// Parse log lines
let relevant = read_lines_buffered(log_file).filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()));
let mut wal_entry_sizes = HashMap::<Lsn, usize>::new();
let mut lsn_page_pairs = Vec::<(Lsn, Page)>::new();
for line in relevant {
let (lsn, rest) = line.split_once(" ").unwrap();
let (page, size) = rest.split_once(" ").unwrap();
let lsn = hex::decode(lsn)?;
let lsn = Lsn(AsyncReadExt::read_u64(&mut Cursor::new(lsn)).await?);
let page = hex::decode(page)?;
let page = Page::read(&mut Cursor::new(page)).await?;
let size: usize = size.parse().unwrap();
lsn_page_pairs.push((lsn, page));
wal_entry_sizes.insert(lsn, size);
}
lsn_page_pairs.sort();
// Organize write info
let mut writes_per_entry = HashMap::<Lsn, Vec<Page>>::new();
for (lsn, page) in lsn_page_pairs.clone() {
writes_per_entry.entry(lsn).or_insert(vec![]).push(page);
}
// Print some stats
let mut updates_per_page = HashMap::<Page, usize>::new();
for (_, page) in lsn_page_pairs.clone() {
updates_per_page.entry(page).or_insert(0).add_assign(1);
}
let mut updates_per_page: Vec<(&usize, &Page)> = updates_per_page
.iter().map(|(k, v)| (v, k)).collect();
updates_per_page.sort();
updates_per_page.reverse();
// dbg!(&updates_per_page);
dbg!(updates_per_page[0]);
let updated_pages: HashSet<&&Page> = updates_per_page.iter().filter_map(|(count, page)| {
if **count > 1 {
Some(page)
} else {
None
}
}).collect();
let hottest_page = updates_per_page[0].1;
let first_update = lsn_page_pairs
.iter()
.filter(|(_lsn, page)| page == hottest_page)
.map(|(lsn, _page)| lsn)
.min()
.unwrap();
let wal_metadata: Vec<WalEntryMetadata> = read_lines_buffered(metadata_file)
.map(|line| serde_json::from_str(&line).expect("corrupt metadata file"))
.collect();
// Get raw TCP connection to the pageserver postgres protocol port
let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?;
@@ -222,152 +130,30 @@ async fn main() -> Result<()> {
_ = client.query(init_query.as_str(), &[]) => (),
};
let entries = {
let mut entries: Vec<(Lsn, usize, Vec<Page>)> = vec![];
for (lsn, page) in lsn_page_pairs.clone() {
if let Some(last) = entries.last_mut() {
if last.0 == lsn {
last.2.push(page)
} else {
entries.push((lsn, wal_entry_sizes[&lsn], vec![page]));
}
} else {
entries.push((lsn, wal_entry_sizes[&lsn], vec![page]));
}
}
entries
};
// Derive some variables
let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum();
let affected_pages: HashSet<_> = wal_metadata.iter().map(|m| m.affected_pages.clone())
.flatten().collect();
let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap();
println!("Total pages: {}", affected_pages.len());
println!("total wal: {}", wal_metadata.len());
println!("total wal bytes: {}", total_wal_size);
// Get all latest pages
let mut durations: Vec<Duration> = vec![];
let latest_lsn = entries.iter().map(|(lsn, _, _)| lsn).max().unwrap();
for (_, page) in updates_per_page.clone() {
for page in affected_pages {
let start = Instant::now();
let _page_bytes = get_page(&mut socket, latest_lsn, page, true).await?;
let _page_bytes = get_page(&mut socket, &latest_lsn, &page, true).await?;
let duration = start.elapsed();
durations.push(duration);
}
println!("Total pages: {}", updates_per_page.len());
durations.sort();
println!("Fastest: {:?}", durations.first().unwrap());
println!("Median: {:?}", durations[durations.len() / 2]);
println!("99th percentile: {:?}", durations[durations.len() - 1 - durations.len() / 100]);
println!("Slowest: {:?}", durations.last().unwrap());
return Ok(());
let mut plot = Plot::new();
plot.add_trace(Histogram::new(durations));
plot.show();
return Ok(());
let mut results: Vec<(Lsn, usize)> = vec![];
let mut page_data = HashMap::<Page, Vec<u8>>::new();
let mut total_dirty_units = 0;
let mut total_size = 0;
for (lsn, size, pages) in entries {
// if !updated_pages.contains(&&page) {
// continue;
// }
for page in pages {
let page_bytes = get_page(&mut socket, &lsn, &page, false).await?;
let dirty_units = if let Some(prev_bytes) = page_data.get(&page) {
let mut dirty_units = 0;
for unit_idx in 0..BYTES_IN_PAGE/64 {
let mut is_dirty = 0;
for offset in 0..64 {
let byte_idx = 64 * unit_idx + offset;
let xor = page_bytes[byte_idx] ^ prev_bytes[byte_idx];
if xor > 0 {
is_dirty = 1;
}
}
dirty_units += is_dirty;
}
if dirty_units > BYTES_IN_PAGE/64 {
panic!("wtf")
}
dirty_units
} else {
BYTES_IN_PAGE/64 // TODO we can do better
};
page_data.insert(page, page_bytes);
total_dirty_units += dirty_units;
}
total_size += size;
}
dbg!(total_dirty_units, total_size);
dbg!(((64 * total_dirty_units) as f64) / (total_size as f64));
// Returned:
// - 0.22 using 0 for new page dirty estimate
return Ok(());
results.sort();
let x: Vec<_> = results.iter().map(|(lsn, _)| lsn.0).collect();
let z: Vec<_> = results.iter().map(|(_, modified)| modified.clone()).collect();
// let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines);
let modified_bits_histogram = Histogram::new(z).name("modbits");
let mut plot = Plot::new();
// plot.add_trace(get_page_trace);
// plot.add_trace(modified_bits_trace);
plot.add_trace(modified_bits_histogram);
plot.show();
return Ok(());
// Do some warmup
let mut prev_page = get_page(&mut socket, &first_update, &hottest_page, false).await?;
let mut results: Vec<(Lsn, (Duration, usize))> = vec![];
for (i, (lsn, _pages)) in writes_per_entry.iter().enumerate() {
if lsn >= first_update {
// Just to speed up things
// if i % 1000 != 0 {
// continue
// }
// println!("Running get_page {:?} at {:?}", hottest_page, lsn);
let start = Instant::now();
let page = get_page(&mut socket, &lsn, &hottest_page, false).await?;
let duration = start.elapsed();
// TODO why is most modified page constant? Is this test correct?
let modified_bits = {
let mut modified_bits = 0;
for byte_idx in 0..BYTES_IN_PAGE {
let xor = page[byte_idx] ^ prev_page[byte_idx];
modified_bits = xor.count_ones() as usize;
}
modified_bits
};
prev_page = page;
results.push((lsn.clone(), (duration, modified_bits)));
// println!("Time: {:?}", duration);
}
}
results.sort();
let x: Vec<_> = results.iter().map(|(lsn, _)| lsn.0).collect();
let y: Vec<_> = results.iter().map(|(_, (duration, _))| duration.as_micros()).collect();
let z: Vec<_> = results.iter().map(|(_, (_, modified))| modified.clone()).collect();
let get_page_trace = Scatter::new(x.clone(), y).name("get_page").mode(Mode::Lines);
let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines);
let mut plot = Plot::new();
// plot.add_trace(get_page_trace);
plot.add_trace(modified_bits_trace);
plot.show();
Ok(())
}

View File

@@ -19,6 +19,7 @@ pub mod walingest;
pub mod walreceiver;
pub mod walrecord;
pub mod walredo;
pub mod wal_metadata;
use lazy_static::lazy_static;
use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};

View File

@@ -0,0 +1,78 @@
use anyhow::Result;
use once_cell::sync::OnceCell;
use zenith_utils::lsn::Lsn;
use std::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use bytes::{BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use crate::{config::PageServerConf, repository::Key, walrecord::DecodedBkpBlock};
pub static WAL_METADATA_FILE: OnceCell<File> = OnceCell::new();
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub struct Page {
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: u32,
}
impl Page {
pub async fn read<Reader>(buf: &mut Reader) -> Result<Page>
where
Reader: tokio::io::AsyncRead + Unpin,
{
let spcnode = buf.read_u32().await?;
let dbnode = buf.read_u32().await?;
let relnode = buf.read_u32().await?;
let forknum = buf.read_u8().await?;
let blkno = buf.read_u32().await?;
Ok(Page { spcnode, dbnode, relnode, forknum, blkno })
}
pub async fn write(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u8(self.forknum);
buf.put_u32(self.blkno);
Ok(())
}
}
impl From<&DecodedBkpBlock> for Page {
fn from(blk: &DecodedBkpBlock) -> Self {
Page {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
blkno: blk.blkno,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WalEntryMetadata {
pub lsn: Lsn,
pub size: usize,
pub affected_pages: Vec<Page>,
}
pub fn init(conf: &'static PageServerConf) -> Result<()> {
let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log");
WAL_METADATA_FILE.set(File::create(wal_metadata_file_dir)?)
.expect("wal_metadata file is already created");
Ok(())
}
pub fn write(wal_meta: WalEntryMetadata) -> Result<()> {
if let Some(mut file) = WAL_METADATA_FILE.get() {
let mut line = serde_json::to_string(&wal_meta)?;
line.push('\n');
std::io::prelude::Write::write_all(&mut file, line.as_bytes())?;
}
Ok(())
}

View File

@@ -34,6 +34,7 @@ use std::collections::HashMap;
use crate::pgdatadir_mapping::*;
use crate::relish::*;
use crate::repository::Repository;
use crate::wal_metadata::WalEntryMetadata;
use crate::walrecord::*;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::xlog_utils::*;
@@ -256,6 +257,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
hex::encode(bytes.freeze())
};
let page_hex = {
let foo: DecodedBkpBlock;
use bytes::BufMut;
let mut page = BytesMut::new();
page.put_u32(blk.rnode_spcnode);
@@ -270,6 +272,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
self.ingest_decoded_block(&mut writer, lsn, &decoded, blk)?;
}
// Emit wal entry metadata, if configured to do so
crate::wal_metadata::write(WalEntryMetadata {
lsn,
size: recdata_len,
affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect()
});
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode();

View File

@@ -614,11 +614,10 @@ class ZenithEnv:
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
def run_psbench(self, timeline):
ps_log_filename = os.path.join(self.repo_dir, "pageserver.log")
ps_connstr = self.pageserver.connstr()
wal_metadata_filename = os.path.join(self.repo_dir, "wal_metadata.log")
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
tenant_hex = self.initial_tenant.hex
args = [psbench_binpath, ps_log_filename, ps_connstr, tenant_hex, timeline]
args = [psbench_binpath, wal_metadata_filename, tenant_hex, timeline]
subprocess.run(args)
@cached_property

View File

@@ -18,7 +18,7 @@ def test_get_page(zenith_simple_env: ZenithEnv,
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
workload = "pgbench long"
workload = "pgbench"
print(f"Running workload {workload}")
if workload == "hot page":