mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Add walingest test (#5892)
This commit is contained in:
@@ -2926,7 +2926,7 @@ impl Tenant {
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
pub(crate) async fn bootstrap_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
|
||||
@@ -2079,4 +2079,88 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Replay a wal segment file taken directly from safekeepers.
|
||||
///
|
||||
/// This test is useful for benchmarking since it allows us to profile only
|
||||
/// the walingest code in a single-threaded executor, and iterate more quickly
|
||||
/// without waiting for unrelated steps.
|
||||
#[tokio::test]
|
||||
async fn test_ingest_real_wal() {
|
||||
use crate::tenant::harness::*;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
|
||||
// Define test data path and constants.
|
||||
//
|
||||
// Steps to reconstruct the data, if needed:
|
||||
// 1. Run the pgbench python test
|
||||
// 2. Take the first wal segment file from safekeeper
|
||||
// 3. Compress it using `zstd --long input_file`
|
||||
// 4. Copy initdb.tar.zst from local_fs_remote_storage
|
||||
// 5. Grep sk logs for "restart decoder" to get startpoint
|
||||
// 6. Run just the decoder from this test to get the endpoint.
|
||||
// It's the last LSN the decoder will output.
|
||||
let pg_version = 15; // The test data was generated by pg15
|
||||
let path = "test_data/sk_wal_segment_from_pgbench";
|
||||
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
|
||||
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
||||
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
||||
|
||||
// Bootstrap a real timeline. We can't use create_test_timeline because
|
||||
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
||||
// the garbage data.
|
||||
//
|
||||
// TODO use the initdb.tar.zst file stored with the test data to avoid
|
||||
// problems with inconsistent initdb results after pg minor version bumps.
|
||||
let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// We fully read and decompress this into memory before decoding
|
||||
// to get a more accurate perf profile of the decoder.
|
||||
let bytes = {
|
||||
use async_compression::tokio::bufread::ZstdDecoder;
|
||||
let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let decoder = ZstdDecoder::new(reader);
|
||||
let mut reader = tokio::io::BufReader::new(decoder);
|
||||
let mut buffer = Vec::new();
|
||||
tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
|
||||
buffer
|
||||
};
|
||||
|
||||
// TODO start a profiler too
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// Initialize walingest
|
||||
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
||||
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
println!("decoding {} bytes", bytes.len() - xlogoff);
|
||||
|
||||
// Decode and ingest wal. We process the wal in chunks because
|
||||
// that's what happens when we get bytes from safekeepers.
|
||||
for chunk in bytes[xlogoff..].chunks(50) {
|
||||
decoder.feed_bytes(chunk);
|
||||
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
println!("done in {:?}", duration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1182,7 +1182,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn short_v14_redo() {
|
||||
let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
|
||||
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
|
||||
|
||||
let h = RedoHarness::new().unwrap();
|
||||
|
||||
|
||||
Binary file not shown.
BIN
pageserver/test_data/sk_wal_segment_from_pgbench/initdb.tar.zst
Normal file
BIN
pageserver/test_data/sk_wal_segment_from_pgbench/initdb.tar.zst
Normal file
Binary file not shown.
Reference in New Issue
Block a user