mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
## Problem Once we use async file system APIs for `VirtualFile`, these functions will also need to be async fn. ## Summary of changes Makes the functions `open, open_with_options, create,sync_all,with_file` of `VirtualFile` async fn, including all functions that call it. Like in the prior PRs, the actual I/O operations are not using async APIs yet, as per request in the #4743 epic. We switch towards not using `VirtualFile` in the par_fsync module, hopefully this is only temporary until we can actually do fully async I/O in `VirtualFile`. This might cause us to exhaust fd limits in the tests, but it should only be an issue for the local developer as we have high ulimits in prod. This PR is a follow-up of #5189, #5190, #5195, and #5203. Part of #4743.
170 lines
5.6 KiB
Rust
170 lines
5.6 KiB
Rust
use std::path::{Path, PathBuf};
|
|
|
|
use anyhow::Result;
|
|
use clap::Subcommand;
|
|
use pageserver::tenant::block_io::BlockCursor;
|
|
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
|
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
|
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
|
use pageserver::{page_cache, virtual_file};
|
|
use pageserver::{
|
|
repository::{Key, KEY_SIZE},
|
|
tenant::{
|
|
block_io::FileBlockReader, disk_btree::VisitDirection,
|
|
storage_layer::delta_layer::DELTA_KEY_SIZE,
|
|
},
|
|
virtual_file::VirtualFile,
|
|
};
|
|
use std::fs;
|
|
use utils::bin_ser::BeSer;
|
|
|
|
use crate::layer_map_analyzer::parse_filename;
|
|
|
|
#[derive(Subcommand)]
|
|
pub(crate) enum LayerCmd {
|
|
/// List all tenants and timelines under the pageserver path
|
|
///
|
|
/// Example: `cargo run --bin pagectl layer list .neon/`
|
|
List { path: PathBuf },
|
|
/// List all layers of a given tenant and timeline
|
|
///
|
|
/// Example: `cargo run --bin pagectl layer list .neon/`
|
|
ListLayer {
|
|
path: PathBuf,
|
|
tenant: String,
|
|
timeline: String,
|
|
},
|
|
/// Dump all information of a layer file
|
|
DumpLayer {
|
|
path: PathBuf,
|
|
tenant: String,
|
|
timeline: String,
|
|
/// The id from list-layer command
|
|
id: usize,
|
|
},
|
|
}
|
|
|
|
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
|
let path = path.as_ref();
|
|
virtual_file::init(10);
|
|
page_cache::init(100);
|
|
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
|
let summary_blk = file.read_blk(0).await?;
|
|
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
|
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
|
actual_summary.index_start_blk,
|
|
actual_summary.index_root_blk,
|
|
&file,
|
|
);
|
|
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
|
|
let mut all = vec![];
|
|
tree_reader
|
|
.visit(
|
|
&[0u8; DELTA_KEY_SIZE],
|
|
VisitDirection::Forwards,
|
|
|key, value_offset| {
|
|
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
|
all.push((curr, BlobRef(value_offset)));
|
|
true
|
|
},
|
|
)
|
|
.await?;
|
|
let cursor = BlockCursor::new_fileblockreader(&file);
|
|
for (k, v) in all {
|
|
let value = cursor.read_blob(v.pos()).await?;
|
|
println!("key:{} value_len:{}", k, value.len());
|
|
}
|
|
// TODO(chi): special handling for last key?
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
|
match cmd {
|
|
LayerCmd::List { path } => {
|
|
for tenant in fs::read_dir(path.join(TENANTS_SEGMENT_NAME))? {
|
|
let tenant = tenant?;
|
|
if !tenant.file_type()?.is_dir() {
|
|
continue;
|
|
}
|
|
println!("tenant {}", tenant.file_name().to_string_lossy());
|
|
for timeline in fs::read_dir(tenant.path().join(TIMELINES_SEGMENT_NAME))? {
|
|
let timeline = timeline?;
|
|
if !timeline.file_type()?.is_dir() {
|
|
continue;
|
|
}
|
|
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
|
}
|
|
}
|
|
}
|
|
LayerCmd::ListLayer {
|
|
path,
|
|
tenant,
|
|
timeline,
|
|
} => {
|
|
let timeline_path = path
|
|
.join(TENANTS_SEGMENT_NAME)
|
|
.join(tenant)
|
|
.join(TIMELINES_SEGMENT_NAME)
|
|
.join(timeline);
|
|
let mut idx = 0;
|
|
for layer in fs::read_dir(timeline_path)? {
|
|
let layer = layer?;
|
|
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
|
|
{
|
|
println!(
|
|
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
|
idx,
|
|
layer_file.key_range.start,
|
|
layer_file.key_range.end,
|
|
layer_file.lsn_range.start,
|
|
layer_file.lsn_range.end,
|
|
layer_file.is_delta,
|
|
);
|
|
idx += 1;
|
|
}
|
|
}
|
|
}
|
|
LayerCmd::DumpLayer {
|
|
path,
|
|
tenant,
|
|
timeline,
|
|
id,
|
|
} => {
|
|
let timeline_path = path
|
|
.join("tenants")
|
|
.join(tenant)
|
|
.join("timelines")
|
|
.join(timeline);
|
|
let mut idx = 0;
|
|
for layer in fs::read_dir(timeline_path)? {
|
|
let layer = layer?;
|
|
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
|
|
{
|
|
if *id == idx {
|
|
// TODO(chi): dedup code
|
|
println!(
|
|
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
|
idx,
|
|
layer_file.key_range.start,
|
|
layer_file.key_range.end,
|
|
layer_file.lsn_range.start,
|
|
layer_file.lsn_range.end,
|
|
layer_file.is_delta,
|
|
);
|
|
|
|
if layer_file.is_delta {
|
|
read_delta_file(layer.path()).await?;
|
|
} else {
|
|
anyhow::bail!("not supported yet :(");
|
|
}
|
|
|
|
break;
|
|
}
|
|
idx += 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|