mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Fix DeltaLayer dumping (#5045)
## Problem Before, DeltaLayer dumping (via `cargo run --release -p pagectl -- print-layer-file` ) would crash as one can't call `Handle::block_on` in an async executor thread. ## Summary of changes Avoid the problem by using `DeltaLayerInner::load_keys` to load the keys into RAM (which we already do during compaction), and then load the values one by one during dumping.
This commit is contained in:
@@ -3951,6 +3951,31 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delta_layer_dumping() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
let layer_map = tline.layers.read().await;
|
||||
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
|
||||
|
||||
assert!(!level0_deltas.is_empty());
|
||||
|
||||
for delta in level0_deltas {
|
||||
let delta = layer_map.get_from_desc(&delta);
|
||||
// Ensure we are dumping a delta layer here
|
||||
let delta = delta.downcast_delta_layer().unwrap();
|
||||
|
||||
delta.dump(false, &ctx).await.unwrap();
|
||||
delta.dump(true, &ctx).await.unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn corrupt_metadata() -> anyhow::Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
|
||||
@@ -51,7 +51,6 @@ use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::*;
|
||||
|
||||
@@ -177,10 +176,6 @@ impl DeltaKey {
|
||||
Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
|
||||
}
|
||||
|
||||
fn extract_key_from_buf(buf: &[u8]) -> Key {
|
||||
Key::from_slice(&buf[..KEY_SIZE])
|
||||
}
|
||||
|
||||
fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
|
||||
let mut lsn_buf = [0u8; 8];
|
||||
lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
|
||||
@@ -277,48 +272,42 @@ impl Layer for DeltaLayer {
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
let cursor = file.block_cursor();
|
||||
let keys = DeltaLayerInner::load_keys(&Ref(&**inner)).await?;
|
||||
|
||||
// A subroutine to dump a single blob
|
||||
let dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
|
||||
// TODO this is not ideal, but on the other hand we are in dumping code...
|
||||
let buf = Handle::current().block_on(cursor.read_blob(blob_ref.pos()))?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
let dump_blob = |val: ValueRef<_>| -> _ {
|
||||
async move {
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
}
|
||||
};
|
||||
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|delta_key, val| {
|
||||
let blob_ref = BlobRef(val);
|
||||
let key = DeltaKey::extract_key_from_buf(delta_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
|
||||
|
||||
let desc = match dump_blob(blob_ref) {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => format!("ERROR: {}", err),
|
||||
};
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
true
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
for entry in keys {
|
||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||
let desc = match dump_blob(val).await {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => {
|
||||
let err: anyhow::Error = err;
|
||||
format!("ERROR: {err}")
|
||||
}
|
||||
};
|
||||
println!(" key {key} at {lsn}: {desc}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user