pagectl: add subcommand to rewrite layer file history

This commit is contained in:
Christian Schwarz
2023-11-27 14:53:13 +00:00
parent a76a503b8b
commit ae2c7589f9
4 changed files with 196 additions and 18 deletions

View File

@@ -1,13 +1,15 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
use pageserver::{
@@ -20,6 +22,7 @@ use pageserver::{
};
use std::fs;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
new_tenant_id: Option<TenantId>,
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
Ok(())
}
LayerCmd::ListLayer {
path,
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::DumpLayer {
path,
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
macro_rules! rewrite_closure {
($($summary_ty:tt)*) => {{
|summary| $($summary_ty)* {
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
..summary
}
}};
}
let res = ImageLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(image_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of image layer {layer_file_path}");
return Ok(());
}
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(image_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
let res = DeltaLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(delta_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of delta layer {layer_file_path}");
return Ok(());
}
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(delta_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
}
Ok(())
}

View File

@@ -2,7 +2,7 @@
pub mod delta_layer;
mod filename;
mod image_layer;
pub mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;

View File

@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Summary {
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
/// Block number where the 'index' part of the file begins.
pub index_start_blk: u32,
@@ -611,6 +611,61 @@ impl Drop for DeltaLayerWriter {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl DeltaLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
return Err(RewriteSummaryError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure

View File

@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct Summary {
pub struct Summary {
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn: Lsn,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn: Lsn,
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
pub index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
pub index_root_blk: u32,
// the 'values' part starts after the summary header, on block 1.
}
@@ -296,6 +296,61 @@ impl ImageLayer {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl ImageLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
return Err(RewriteSummaryError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl ImageLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure