mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
Count modified bits per wal (WIP)
This commit is contained in:
@@ -13,6 +13,8 @@ use std::time::Instant;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
const BYTES_IN_PAGE: usize = 8 * 1024;
|
||||
|
||||
pub fn read_lines_buffered(file_name: &str) -> impl Iterator<Item = String> {
|
||||
BufReader::new(File::open(file_name).unwrap())
|
||||
.lines()
|
||||
@@ -55,7 +57,7 @@ pub async fn get_page(
|
||||
102 => {
|
||||
let mut page = Vec::<u8>::new();
|
||||
cursor.read_to_end(&mut page).await?;
|
||||
if page.len() != 8 * 1024 {
|
||||
if page.len() != BYTES_IN_PAGE {
|
||||
panic!("Expected 8kb page, got: {:?}", page.len());
|
||||
}
|
||||
page
|
||||
@@ -174,7 +176,8 @@ async fn main() -> Result<()> {
|
||||
.iter().map(|(k, v)| (v, k)).collect();
|
||||
updates_per_page.sort();
|
||||
updates_per_page.reverse();
|
||||
dbg!(&updates_per_page);
|
||||
// dbg!(&updates_per_page);
|
||||
dbg!(updates_per_page[0]);
|
||||
|
||||
let hottest_page = updates_per_page[0].1;
|
||||
let first_update = lsn_page_pairs
|
||||
@@ -208,35 +211,50 @@ async fn main() -> Result<()> {
|
||||
// TODO concurrent requests: multiple reads, also writes.
|
||||
|
||||
// Do some warmup
|
||||
let _page = get_page(&mut socket, &first_update, &hottest_page).await?;
|
||||
let mut prev_page = get_page(&mut socket, &first_update, &hottest_page).await?;
|
||||
|
||||
let mut results: Vec<(Lsn, Duration)> = vec![];
|
||||
let mut results: Vec<(Lsn, (Duration, usize))> = vec![];
|
||||
for (i, (lsn, _pages)) in writes_per_entry.iter().enumerate() {
|
||||
if lsn >= first_update {
|
||||
|
||||
// Just to speed up things
|
||||
if i % 1000 != 0 {
|
||||
continue
|
||||
}
|
||||
// if i % 1000 != 0 {
|
||||
// continue
|
||||
// }
|
||||
|
||||
// println!("Running get_page {:?} at {:?}", hottest_page, lsn);
|
||||
let start = Instant::now();
|
||||
let _page = get_page(&mut socket, &lsn, &hottest_page).await?;
|
||||
let page = get_page(&mut socket, &lsn, &hottest_page).await?;
|
||||
let duration = start.elapsed();
|
||||
results.push((lsn.clone(), duration));
|
||||
|
||||
// TODO why is most modified page constant? Is this test correct?
|
||||
let modified_bits = {
|
||||
let mut modified_bits = 0;
|
||||
for byte_idx in 0..BYTES_IN_PAGE {
|
||||
let xor = page[byte_idx] ^ prev_page[byte_idx];
|
||||
modified_bits = xor.count_ones() as usize;
|
||||
}
|
||||
modified_bits
|
||||
};
|
||||
prev_page = page;
|
||||
|
||||
results.push((lsn.clone(), (duration, modified_bits)));
|
||||
// println!("Time: {:?}", duration);
|
||||
}
|
||||
}
|
||||
results.sort();
|
||||
let x: Vec<_> = results.iter().map(|(lsn, _)| lsn.0).collect();
|
||||
let y: Vec<_> = results.iter().map(|(_, duration)| duration.as_micros()).collect();
|
||||
let y: Vec<_> = results.iter().map(|(_, (duration, _))| duration.as_micros()).collect();
|
||||
let z: Vec<_> = results.iter().map(|(_, (_, modified))| modified.clone()).collect();
|
||||
|
||||
use plotly::{Plot, Scatter};
|
||||
use plotly::common::Mode;
|
||||
let get_page_trace = Scatter::new(x, y).name("get_page").mode(Mode::Lines);
|
||||
let get_page_trace = Scatter::new(x.clone(), y).name("get_page").mode(Mode::Lines);
|
||||
let modified_bits_trace = Scatter::new(x, z).name("modified_bits").mode(Mode::Lines);
|
||||
|
||||
let mut plot = Plot::new();
|
||||
plot.add_trace(get_page_trace);
|
||||
// plot.add_trace(get_page_trace);
|
||||
plot.add_trace(modified_bits_trace);
|
||||
plot.show();
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -618,7 +618,6 @@ class ZenithEnv:
|
||||
ps_connstr = self.pageserver.connstr()
|
||||
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
|
||||
tenant_hex = self.initial_tenant.hex
|
||||
print("AAAAAAAA", ps_connstr)
|
||||
args = [psbench_binpath, ps_log_filename, ps_connstr, tenant_hex, timeline]
|
||||
subprocess.run(args)
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from contextlib import closing
|
||||
from fixtures.zenith_fixtures import ZenithEnv
|
||||
from fixtures.zenith_fixtures import ZenithEnv, PgBin
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
|
||||
|
||||
def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker):
|
||||
def test_get_page(zenith_simple_env: ZenithEnv,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
pg_bin: PgBin):
|
||||
env = zenith_simple_env
|
||||
env.zenith_cli.create_branch("test_pageserver", "empty")
|
||||
pg = env.postgres.create_start('test_pageserver')
|
||||
@@ -16,17 +18,16 @@ def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('create table t (i integer);')
|
||||
cur.execute('insert into t values (0);')
|
||||
|
||||
for i in range(100000):
|
||||
cur.execute(f'update t set i = {i};')
|
||||
workload = "pgbench"
|
||||
if workload == "hot page":
|
||||
cur.execute('create table t (i integer);')
|
||||
cur.execute('insert into t values (0);')
|
||||
for i in range(100000):
|
||||
cur.execute(f'update t set i = {i};')
|
||||
elif workload == "pgbench":
|
||||
pg_bin.run_capture(['pgbench', '-s5', '-i', pg.connstr()])
|
||||
pg_bin.run_capture(['pgbench', '-c1', '-t5000', pg.connstr()])
|
||||
|
||||
pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0")
|
||||
|
||||
cur.execute("select * from t;")
|
||||
res = cur.fetchall()
|
||||
print("AAAA")
|
||||
print(res)
|
||||
|
||||
env.run_psbench(timeline)
|
||||
|
||||
Reference in New Issue
Block a user