diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index ce911a519a..6ab8fc4e3f 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -2086,7 +2086,7 @@ mod tests { /// the walingest code in a single-threaded executor, and iterate more quickly /// without waiting for unrelated steps. #[tokio::test] - async fn test_ingest_real_wal() -> anyhow::Result<()> { + async fn test_ingest_real_wal() { use crate::tenant::harness::*; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::WAL_SEGMENT_SIZE; @@ -2095,24 +2095,37 @@ mod tests { // it doesn't create a real checkpoint, and Walingest::new tries to parse // the garbage data. let pg_version = 15; - let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal")?.load().await; + let (tenant, ctx) = TenantHarness::create("test_ingest_real_wal") + .unwrap() + .load() + .await; let tline = tenant .bootstrap_timeline(TIMELINE_ID, pg_version, &ctx) - .await?; + .await + .unwrap(); // Get test data. Steps to reconstruct it, if needed: // 1. Run the pgbench python test // 2. Take the first wal segment file from safekeeper - // 3. Grep sk logs for "restart decoder" to get startpoint - // 4. Run just the decoder from this test to get the endpoint. + // 3. Compress it using `gzip -c input_file > output_file.gz` + // 4. Grep sk logs for "restart decoder" to get startpoint + // 5. Run just the decoder from this test to get the endpoint. // It's the last LSN the decoder will output. - let path = "test_data/sk_wal_segment_from_pgbench"; + let path = "test_data/sk_wal_segment_from_pgbench.gz"; let startpoint = Lsn::from_hex("14AEC08").unwrap(); let endpoint = Lsn::from_hex("1FFFF98").unwrap(); - // We fully read this into memory before decoding to get a - // more accurate perf profile of the decoder. - let bytes = std::fs::read(path)?; + // We fully read and decompress this into memory before decoding + // to get a more accurate perf profile of the decoder. + let bytes = { + use std::io::Read; + let file_reader = std::fs::File::open(path).unwrap(); + let buffered_reader = std::io::BufReader::new(file_reader); + let mut decoder = flate2::read::GzDecoder::new(buffered_reader); + let mut buffer: Vec = vec![]; + decoder.read_to_end(&mut buffer).unwrap(); + buffer + }; // TODO start a profiler too let started_at = std::time::Instant::now(); @@ -2120,7 +2133,9 @@ mod tests { // Initialize walingest let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE); let mut decoder = WalStreamDecoder::new(startpoint, pg_version); - let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?; + let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx) + .await + .unwrap(); let mut modification = tline.begin_modification(endpoint); let mut decoded = DecodedWALRecord::default(); println!("decoding {} bytes", bytes.len() - xlogoff); @@ -2129,16 +2144,15 @@ mod tests { // that's what happens when we get bytes from safekeepers. for chunk in bytes[xlogoff..].chunks(50) { decoder.feed_bytes(chunk); - while let Some((lsn, recdata)) = decoder.poll_decode()? { + while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) - .await?; + .await + .unwrap(); } } let duration = started_at.elapsed(); println!("done in {:?}", duration); - - Ok(()) } } diff --git a/pageserver/test_data/sk_wal_segment_from_pgbench b/pageserver/test_data/sk_wal_segment_from_pgbench deleted file mode 100644 index 71190be5f3..0000000000 Binary files a/pageserver/test_data/sk_wal_segment_from_pgbench and /dev/null differ diff --git a/pageserver/test_data/sk_wal_segment_from_pgbench.gz b/pageserver/test_data/sk_wal_segment_from_pgbench.gz new file mode 100644 index 0000000000..29f713ff8b Binary files /dev/null and b/pageserver/test_data/sk_wal_segment_from_pgbench.gz differ