Add walingest test

This commit is contained in:
Bojan Serafimov
2023-11-21 13:05:39 -05:00
parent 0d10992e46
commit 128176702e
3 changed files with 63 additions and 1 deletions

View File

@@ -2878,7 +2878,7 @@ impl Tenant {
/// - after initialization complete, remove the temp dir.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
pub async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,

View File

@@ -2079,4 +2079,66 @@ mod tests {
Ok(())
}
/// Replay a wal segment file taken directly from safekeepers.
///
/// This test is useful for benchmarking since it allows us to profile only
/// the walingest code in a single-threaded executor, and iterate more quickly
/// without waiting for unrelated steps.
#[tokio::test]
async fn test_ingest_real_wal() -> anyhow::Result<()> {
use crate::tenant::harness::*;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::WAL_SEGMENT_SIZE;
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
let pg_version = 15;
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.await?;
// Get test data. Steps to reconstruct it, if needed:
// 1. Run the pgbench python test
// 2. Take the first wal segment file from safekeeper
// 3. Grep sk logs for "restart decoder" to get startpoint
// 4. Run just the decoder from this test to get the endpoint.
// It's the last LSN the decoder will output.
let path = "test_data/sk_wal_segment_from_pgbench";
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
// We fully read this into memory before decoding to get a
// more accurate perf profile of the decoder.
let bytes = std::fs::read(path)?;
// TODO start a profiler too
let started_at = std::time::Instant::now();
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?;
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await?;
}
}
let duration = started_at.elapsed();
println!("done in {:?}", duration);
Ok(())
}
}

Binary file not shown.