mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Measure that materializing all versions bloats by 0.3
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! Usually it's easier to write python perf tests, but here the performance
|
||||
//! of the tester matters, and the API is easier to work with from rust.
|
||||
use std::{collections::HashMap, io::{BufRead, BufReader, Cursor}, net::SocketAddr, ops::AddAssign, time::Duration};
|
||||
use std::{collections::{HashMap, HashSet}, io::{BufRead, BufReader, Cursor}, net::SocketAddr, ops::AddAssign, time::Duration};
|
||||
use byteorder::ReadBytesExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
@@ -10,6 +10,8 @@ use clap::{App, Arg};
|
||||
use std::fs::File;
|
||||
use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}};
|
||||
use std::time::Instant;
|
||||
use plotly::{Plot, Scatter, Histogram};
|
||||
use plotly::common::Mode;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
@@ -147,10 +149,12 @@ async fn main() -> Result<()> {
|
||||
let timeline = arg_matches.value_of("timeline").unwrap();
|
||||
|
||||
// Parse log lines
|
||||
let relevant = read_lines_buffered(log_file) .filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()));
|
||||
let relevant = read_lines_buffered(log_file).filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()));
|
||||
let mut wal_entry_sizes = HashMap::<Lsn, usize>::new();
|
||||
let mut lsn_page_pairs = Vec::<(Lsn, Page)>::new();
|
||||
for line in relevant {
|
||||
let (lsn, page) = line.split_once(" ").unwrap();
|
||||
let (lsn, rest) = line.split_once(" ").unwrap();
|
||||
let (page, size) = rest.split_once(" ").unwrap();
|
||||
|
||||
let lsn = hex::decode(lsn)?;
|
||||
let lsn = Lsn(AsyncReadExt::read_u64(&mut Cursor::new(lsn)).await?);
|
||||
@@ -158,8 +162,12 @@ async fn main() -> Result<()> {
|
||||
let page = hex::decode(page)?;
|
||||
let page = Page::read(&mut Cursor::new(page)).await?;
|
||||
|
||||
lsn_page_pairs.push((lsn, page))
|
||||
let size: usize = size.parse().unwrap();
|
||||
|
||||
lsn_page_pairs.push((lsn, page));
|
||||
wal_entry_sizes.insert(lsn, size);
|
||||
}
|
||||
lsn_page_pairs.sort();
|
||||
|
||||
// Organize write info
|
||||
let mut writes_per_entry = HashMap::<Lsn, Vec<Page>>::new();
|
||||
@@ -179,6 +187,14 @@ async fn main() -> Result<()> {
|
||||
// dbg!(&updates_per_page);
|
||||
dbg!(updates_per_page[0]);
|
||||
|
||||
let updated_pages: HashSet<&&Page> = updates_per_page.iter().filter_map(|(count, page)| {
|
||||
if **count > 1 {
|
||||
Some(page)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let hottest_page = updates_per_page[0].1;
|
||||
let first_update = lsn_page_pairs
|
||||
.iter()
|
||||
@@ -204,11 +220,80 @@ async fn main() -> Result<()> {
|
||||
_ = client.query(init_query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
// TODO merge with LSM branch. Nothing to test otherwise, too many images.
|
||||
// - I get error: tried to request a page version that was garbage collected
|
||||
// TODO be mindful of caching, take multiple measurements, use monotonic time.
|
||||
// TODO make harder test case. More writes, fewer images.
|
||||
// TODO concurrent requests: multiple reads, also writes.
|
||||
let entries = {
|
||||
let mut entries: Vec<(Lsn, usize, Vec<Page>)> = vec![];
|
||||
for (lsn, page) in lsn_page_pairs.clone() {
|
||||
if let Some(last) = entries.last_mut() {
|
||||
if last.0 == lsn {
|
||||
last.2.push(page)
|
||||
} else {
|
||||
entries.push((lsn, wal_entry_sizes[&lsn], vec![page]));
|
||||
}
|
||||
} else {
|
||||
entries.push((lsn, wal_entry_sizes[&lsn], vec![page]));
|
||||
}
|
||||
}
|
||||
entries
|
||||
};
|
||||
|
||||
let mut results: Vec<(Lsn, usize)> = vec![];
|
||||
let mut page_data = HashMap::<Page, Vec<u8>>::new();
|
||||
let mut total_dirty_units = 0;
|
||||
let mut total_size = 0;
|
||||
for (lsn, size, pages) in entries {
|
||||
// if !updated_pages.contains(&&page) {
|
||||
// continue;
|
||||
// }
|
||||
for page in pages {
|
||||
let page_bytes = get_page(&mut socket, &lsn, &page).await?;
|
||||
|
||||
let dirty_units = if let Some(prev_bytes) = page_data.get(&page) {
|
||||
let mut dirty_units = 0;
|
||||
for unit_idx in 0..BYTES_IN_PAGE/64 {
|
||||
let mut is_dirty = 0;
|
||||
for offset in 0..64 {
|
||||
let byte_idx = 64 * unit_idx + offset;
|
||||
let xor = page_bytes[byte_idx] ^ prev_bytes[byte_idx];
|
||||
if xor > 0 {
|
||||
is_dirty = 1;
|
||||
}
|
||||
}
|
||||
dirty_units += is_dirty;
|
||||
}
|
||||
if dirty_units > BYTES_IN_PAGE/64 {
|
||||
panic!("wtf")
|
||||
}
|
||||
dirty_units
|
||||
} else {
|
||||
BYTES_IN_PAGE/64 // TODO we can do better
|
||||
};
|
||||
page_data.insert(page, page_bytes);
|
||||
|
||||
total_dirty_units += dirty_units;
|
||||
}
|
||||
total_size += size;
|
||||
}
|
||||
|
||||
dbg!(total_dirty_units, total_size);
|
||||
dbg!(((64 * total_dirty_units) as f64) / (total_size as f64));
|
||||
// Returned:
|
||||
// - 0.22 using 0 for new page dirty estimate
|
||||
return Ok(());
|
||||
|
||||
results.sort();
|
||||
let x: Vec<_> = results.iter().map(|(lsn, _)| lsn.0).collect();
|
||||
let z: Vec<_> = results.iter().map(|(_, modified)| modified.clone()).collect();
|
||||
|
||||
// let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines);
|
||||
let modified_bits_histogram = Histogram::new(z).name("modbits");
|
||||
|
||||
let mut plot = Plot::new();
|
||||
// plot.add_trace(get_page_trace);
|
||||
// plot.add_trace(modified_bits_trace);
|
||||
plot.add_trace(modified_bits_histogram);
|
||||
plot.show();
|
||||
|
||||
return Ok(());
|
||||
|
||||
// Do some warmup
|
||||
let mut prev_page = get_page(&mut socket, &first_update, &hottest_page).await?;
|
||||
@@ -247,8 +332,6 @@ async fn main() -> Result<()> {
|
||||
let y: Vec<_> = results.iter().map(|(_, (duration, _))| duration.as_micros()).collect();
|
||||
let z: Vec<_> = results.iter().map(|(_, (_, modified))| modified.clone()).collect();
|
||||
|
||||
use plotly::{Plot, Scatter};
|
||||
use plotly::common::Mode;
|
||||
let get_page_trace = Scatter::new(x.clone(), y).name("get_page").mode(Mode::Lines);
|
||||
let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines);
|
||||
|
||||
|
||||
@@ -83,6 +83,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
) -> Result<()> {
|
||||
let mut writer = timeline.begin_record(lsn);
|
||||
|
||||
let recdata_len = recdata.len();
|
||||
let mut decoded = decode_wal_record(recdata);
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
@@ -264,7 +265,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
page.put_u32(blk.blkno);
|
||||
hex::encode(page.freeze())
|
||||
};
|
||||
println!("wal-at-lsn-modified-page {} {}", lsn_hex, page_hex);
|
||||
println!("wal-at-lsn-modified-page {} {} {}", lsn_hex, page_hex, recdata_len);
|
||||
|
||||
self.ingest_decoded_block(&mut writer, lsn, &decoded, blk)?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user