diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d75f61d8b3..bc8779b26f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2926,7 +2926,7 @@ impl Tenant { /// - after initialization completes, tar up the temp dir and upload it to S3. /// /// The caller is responsible for activating the returned timeline. - async fn bootstrap_timeline( + pub(crate) async fn bootstrap_timeline( &self, timeline_id: TimelineId, pg_version: u32, diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 23367928d3..a9173b41e9 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -2079,4 +2079,88 @@ mod tests { Ok(()) } + + /// Replay a wal segment file taken directly from safekeepers. + /// + /// This test is useful for benchmarking since it allows us to profile only + /// the walingest code in a single-threaded executor, and iterate more quickly + /// without waiting for unrelated steps. + #[tokio::test] + async fn test_ingest_real_wal() { + use crate::tenant::harness::*; + use postgres_ffi::waldecoder::WalStreamDecoder; + use postgres_ffi::WAL_SEGMENT_SIZE; + + // Define test data path and constants. + // + // Steps to reconstruct the data, if needed: + // 1. Run the pgbench python test + // 2. Take the first wal segment file from safekeeper + // 3. Compress it using `zstd --long input_file` + // 4. Copy initdb.tar.zst from local_fs_remote_storage + // 5. Grep sk logs for "restart decoder" to get startpoint + // 6. Run just the decoder from this test to get the endpoint. + // It's the last LSN the decoder will output. + let pg_version = 15; // The test data was generated by pg15 + let path = "test_data/sk_wal_segment_from_pgbench"; + let wal_segment_path = format!("{path}/000000010000000000000001.zst"); + let startpoint = Lsn::from_hex("14AEC08").unwrap(); + let endpoint = Lsn::from_hex("1FFFF98").unwrap(); + + // Bootstrap a real timeline. We can't use create_test_timeline because + // it doesn't create a real checkpoint, and Walingest::new tries to parse + // the garbage data. + // + // TODO use the initdb.tar.zst file stored with the test data to avoid + // problems with inconsistent initdb results after pg minor version bumps. + let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal") + .unwrap() + .load() + .await; + let tline = tenant + .bootstrap_timeline(TIMELINE_ID, pg_version, &ctx) + .await + .unwrap(); + + // We fully read and decompress this into memory before decoding + // to get a more accurate perf profile of the decoder. + let bytes = { + use async_compression::tokio::bufread::ZstdDecoder; + let file = tokio::fs::File::open(wal_segment_path).await.unwrap(); + let reader = tokio::io::BufReader::new(file); + let decoder = ZstdDecoder::new(reader); + let mut reader = tokio::io::BufReader::new(decoder); + let mut buffer = Vec::new(); + tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap(); + buffer + }; + + // TODO start a profiler too + let started_at = std::time::Instant::now(); + + // Initialize walingest + let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE); + let mut decoder = WalStreamDecoder::new(startpoint, pg_version); + let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx) + .await + .unwrap(); + let mut modification = tline.begin_modification(endpoint); + let mut decoded = DecodedWALRecord::default(); + println!("decoding {} bytes", bytes.len() - xlogoff); + + // Decode and ingest wal. We process the wal in chunks because + // that's what happens when we get bytes from safekeepers. + for chunk in bytes[xlogoff..].chunks(50) { + decoder.feed_bytes(chunk); + while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { + walingest + .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) + .await + .unwrap(); + } + } + + let duration = started_at.elapsed(); + println!("done in {:?}", duration); + } } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 5d8cc0e181..4e684dec2d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -1182,7 +1182,7 @@ mod tests { #[tokio::test] async fn short_v14_redo() { - let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap(); + let expected = std::fs::read("test_data/short_v14_redo.page").unwrap(); let h = RedoHarness::new().unwrap(); diff --git a/pageserver/fixtures/short_v14_redo.page b/pageserver/test_data/short_v14_redo.page similarity index 100% rename from pageserver/fixtures/short_v14_redo.page rename to pageserver/test_data/short_v14_redo.page diff --git a/pageserver/test_data/sk_wal_segment_from_pgbench/000000010000000000000001.zst b/pageserver/test_data/sk_wal_segment_from_pgbench/000000010000000000000001.zst new file mode 100644 index 0000000000..3c478e7827 Binary files /dev/null and b/pageserver/test_data/sk_wal_segment_from_pgbench/000000010000000000000001.zst differ diff --git a/pageserver/test_data/sk_wal_segment_from_pgbench/initdb.tar.zst b/pageserver/test_data/sk_wal_segment_from_pgbench/initdb.tar.zst new file mode 100644 index 0000000000..17e9c7ea08 Binary files /dev/null and b/pageserver/test_data/sk_wal_segment_from_pgbench/initdb.tar.zst differ