This commit is contained in:
Bojan Serafimov
2023-11-21 12:45:21 -05:00
parent c4af96ee7e
commit 92016ff9b1

View File

@@ -626,22 +626,21 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
// Bootstrap a real timeline. We can't use create_test_timeline because
// it doesn't create a real checkpoint, and Walingest::new tries to parse
// the garbage data.
let pg_version = 15;
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
// We can't use create_test_timeline because it doesn't create a real
// checkpoint, and Walingest::new tries to parse the garbage data.
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.await?;
// Steps to reconstruct this test data:
// Get test data. Steps to reconstruct it, if needed:
// 1. Run the pgbench python test
// 2. Take the first wal segment file from safekeeper
// 3. Grep sk logs for "restart decoder" to get startpoint
// 4. Run just the decoder from this test to get the endpoint.
// It's the last LSN the decoder will output.
let pg_version = 15;
let path = "test_data/sk_wal_segment_from_pgbench";
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
@@ -654,24 +653,23 @@ mod tests {
let prof_guard = crate::profiling::profpoint_start();
let started_at = std::time::Instant::now();
// Feed bytes to the decoder
// TODO try feeding in many small chunks
// Initialize walingest
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
// decoder.feed_bytes(&bytes[xlogoff..]);
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
}
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?;
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
while let Some((lsn, recdata)) = decoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await?;
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal. We process the wal in chunks because
// that's what happens when we get bytes from safekeepers.
for chunk in bytes[xlogoff..].chunks(50) {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await?;
}
}
let duration = started_at.elapsed();