Repeat 1000 times

This commit is contained in:
Bojan Serafimov
2023-11-28 13:13:21 -05:00
parent 0671fdd265
commit 922c4f07d5

View File

@@ -2107,21 +2107,6 @@ mod tests {
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
//
// TODO use the initdb.tar.zst file stored with the test data to avoid
// problems with inconsistent initdb results after pg minor version bumps.
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
.unwrap()
.load()
.await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.await
.unwrap();
// We fully read and decompress this into memory before decoding
// to get a more accurate perf profile of the decoder.
let bytes = {
@@ -2135,38 +2120,57 @@ mod tests {
buffer
};
// Start profiling
let profiler_guard = crate::profiling::init_profiler();
let prof_guard = crate::profiling::profpoint_start();
let started_at = std::time::Instant::now();
let n_iterations = 1000;
let profiler = crate::profiling::init_profiler();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
for iteration in 0..n_iterations {
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
//
// TODO use the initdb.tar.zst file stored with the test data to avoid
// problems with inconsistent initdb results after pg minor version bumps.
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
.unwrap()
.load()
.await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.await
.unwrap();
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.unwrap();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
.await
.unwrap();
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
// Start profiling
let prof_guard = crate::profiling::profpoint_start();
let started_at = std::time::Instant::now();
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.unwrap();
}
}
drop(prof_guard);
let duration = started_at.elapsed();
println!("done iteration {} in {:?}", iteration, duration);
}
drop(prof_guard);
let duration = started_at.elapsed();
println!("done in {:?}", duration);
crate::profiling::exit_profiler(&profiler_guard);
crate::profiling::exit_profiler(&profiler);
}
}