From 92016ff9b1b4ccbb78d6ff3d202065443385794c Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 21 Nov 2023 12:45:21 -0500 Subject: [PATCH] wip --- pageserver/src/import_datadir.rs | 34 +++++++++++++++----------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index bb21713fc0..91da9225d6 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -626,22 +626,21 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { + // Bootstrap a real timeline. We can't use create_test_timeline because + // it doesn't create a real checkpoint, and Walingest::new tries to parse + // the garbage data. let pg_version = 15; let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; - - // We can't use create_test_timeline because it doesn't create a real - // checkpoint, and Walingest::new tries to parse the garbage data. let tline = tenant .bootstrap_timeline(TIMELINE_ID, pg_version, &ctx) .await?; - // Steps to reconstruct this test data: + // Get test data. Steps to reconstruct it, if needed: // 1. Run the pgbench python test // 2. Take the first wal segment file from safekeeper // 3. Grep sk logs for "restart decoder" to get startpoint // 4. Run just the decoder from this test to get the endpoint. // It's the last LSN the decoder will output. - let pg_version = 15; let path = "test_data/sk_wal_segment_from_pgbench"; let startpoint = Lsn::from_hex("14AEC08").unwrap(); let endpoint = Lsn::from_hex("1FFFF98").unwrap(); @@ -654,24 +653,23 @@ mod tests { let prof_guard = crate::profiling::profpoint_start(); let started_at = std::time::Instant::now(); - // Feed bytes to the decoder - // TODO try feeding in many small chunks + // Initialize walingest let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE); let mut decoder = WalStreamDecoder::new(startpoint, pg_version); - // decoder.feed_bytes(&bytes[xlogoff..]); - for chunk in bytes[xlogoff..].chunks(50) { - decoder.feed_bytes(chunk); - } - println!("decoding {} bytes", bytes.len() - xlogoff); - - // Decode and ingest wal let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?; let mut modification = tline.begin_modification(endpoint); let mut decoded = DecodedWALRecord::default(); - while let Some((lsn, recdata)) = decoder.poll_decode()? { - walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) - .await?; + println!("decoding {} bytes", bytes.len() - xlogoff); + + // Decode and ingest wal. We process the wal in chunks because + // that's what happens when we get bytes from safekeepers. + for chunk in bytes[xlogoff..].chunks(50) { + decoder.feed_bytes(chunk); + while let Some((lsn, recdata)) = decoder.poll_decode()? { + walingest + .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) + .await?; + } } let duration = started_at.elapsed();