mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
4 Commits
nikitakaly
...
test_for_u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d7240f0dd | ||
|
|
9333937946 | ||
|
|
f276f21636 | ||
|
|
7126197000 |
10
.github/actions/run-python-test-set/action.yml
vendored
10
.github/actions/run-python-test-set/action.yml
vendored
@@ -36,14 +36,6 @@ inputs:
|
||||
description: 'Region name for real s3 tests'
|
||||
required: false
|
||||
default: ''
|
||||
real_s3_access_key_id:
|
||||
description: 'Access key id'
|
||||
required: false
|
||||
default: ''
|
||||
real_s3_secret_access_key:
|
||||
description: 'Secret access key'
|
||||
required: false
|
||||
default: ''
|
||||
rerun_flaky:
|
||||
description: 'Whether to rerun flaky tests'
|
||||
required: false
|
||||
@@ -104,8 +96,6 @@ runs:
|
||||
COMPATIBILITY_POSTGRES_DISTRIB_DIR: /tmp/neon-previous/pg_install
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: ${{ inputs.build_type }}
|
||||
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
|
||||
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
|
||||
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
|
||||
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
|
||||
|
||||
12
.github/workflows/build_and_test.yml
vendored
12
.github/workflows/build_and_test.yml
vendored
@@ -346,10 +346,8 @@ jobs:
|
||||
test_selection: regress
|
||||
needs_postgres_source: true
|
||||
run_with_real_s3: true
|
||||
real_s3_bucket: ci-tests-s3
|
||||
real_s3_region: us-west-2
|
||||
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
|
||||
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
|
||||
real_s3_bucket: neon-github-ci-tests
|
||||
real_s3_region: eu-central-1
|
||||
rerun_flaky: true
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
env:
|
||||
@@ -872,7 +870,7 @@ jobs:
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: |
|
||||
github.ref_name == 'main' &&
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
@@ -884,7 +882,7 @@ jobs:
|
||||
|
||||
- name: Push images to production ECR
|
||||
if: |
|
||||
github.ref_name == 'main' &&
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:${{needs.tag.outputs.build-tag}} 093970136003.dkr.ecr.eu-central-1.amazonaws.com/neon:latest
|
||||
@@ -907,7 +905,7 @@ jobs:
|
||||
|
||||
- name: Push latest tags to Docker Hub
|
||||
if: |
|
||||
github.ref_name == 'main' &&
|
||||
(github.ref_name == 'main' || github.ref_name == 'release') &&
|
||||
github.event_name != 'workflow_dispatch'
|
||||
run: |
|
||||
crane tag neondatabase/neon:${{needs.tag.outputs.build-tag}} latest
|
||||
|
||||
15
Cargo.lock
generated
15
Cargo.lock
generated
@@ -2587,6 +2587,21 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"clap 4.2.2",
|
||||
"git-version",
|
||||
"pageserver",
|
||||
"postgres_ffi",
|
||||
"svg_fmt",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -3,6 +3,7 @@ members = [
|
||||
"compute_tools",
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
"pageserver/ctl",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
|
||||
@@ -47,8 +47,7 @@ RUN set -e \
|
||||
&& mold -run cargo build \
|
||||
--bin pg_sni_router \
|
||||
--bin pageserver \
|
||||
--bin pageserver_binutils \
|
||||
--bin draw_timeline_dir \
|
||||
--bin pagectl \
|
||||
--bin safekeeper \
|
||||
--bin storage_broker \
|
||||
--bin proxy \
|
||||
@@ -73,8 +72,7 @@ RUN set -e \
|
||||
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
|
||||
|
||||
18
pageserver/ctl/Cargo.toml
Normal file
18
pageserver/ctl/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
git-version.workspace = true
|
||||
pageserver = { path = ".." }
|
||||
postgres_ffi.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
@@ -12,7 +12,7 @@
|
||||
//! Example use:
|
||||
//! ```
|
||||
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
|
||||
//! $ grep "__" | cargo run --release --bin draw_timeline_dir > out.svg
|
||||
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
|
||||
//! $ firefox out.svg
|
||||
//! ```
|
||||
//!
|
||||
@@ -62,7 +62,7 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
|
||||
(keys, lsns)
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
pub fn main() -> Result<()> {
|
||||
// Parse layer filenames from stdin
|
||||
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
|
||||
let stdin = io::stdin();
|
||||
@@ -6,7 +6,7 @@ use anyhow::Result;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::ops::Range;
|
||||
use std::{env, fs, path::Path, path::PathBuf, str, str::FromStr};
|
||||
use std::{fs, path::Path, str};
|
||||
|
||||
use pageserver::page_cache::PAGE_SZ;
|
||||
use pageserver::repository::{Key, KEY_SIZE};
|
||||
@@ -18,12 +18,14 @@ use pageserver::virtual_file::VirtualFile;
|
||||
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
use crate::AnalyzeLayerMapCmd;
|
||||
|
||||
const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128;
|
||||
const DEFAULT_MAX_HOLES: usize = 10;
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(PartialEq, Eq)]
|
||||
struct Hole(Range<Key>);
|
||||
pub struct Hole(Range<Key>);
|
||||
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
@@ -39,11 +41,11 @@ impl PartialOrd for Hole {
|
||||
}
|
||||
}
|
||||
|
||||
struct LayerFile {
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
is_delta: bool,
|
||||
holes: Vec<Hole>,
|
||||
pub(crate) struct LayerFile {
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
pub is_delta: bool,
|
||||
pub holes: Vec<Hole>,
|
||||
}
|
||||
|
||||
impl LayerFile {
|
||||
@@ -67,7 +69,7 @@ impl LayerFile {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
let split: Vec<&str> = name.split("__").collect();
|
||||
if split.len() != 2 {
|
||||
return None;
|
||||
@@ -127,18 +129,9 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
Ok(holes)
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() < 2 {
|
||||
println!("Usage: layer_map_analyzer PAGESERVER_DATA_DIR [MAX_HOLES]");
|
||||
return Ok(());
|
||||
}
|
||||
let storage_path = PathBuf::from_str(&args[1])?;
|
||||
let max_holes = if args.len() > 2 {
|
||||
args[2].parse::<usize>().unwrap()
|
||||
} else {
|
||||
DEFAULT_MAX_HOLES
|
||||
};
|
||||
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10);
|
||||
169
pageserver/ctl/src/layers.rs
Normal file
169
pageserver/ctl/src/layers.rs
Normal file
@@ -0,0 +1,169 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::Subcommand;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
repository::{Key, KEY_SIZE},
|
||||
tenant::{
|
||||
block_io::FileBlockReader, disk_btree::VisitDirection,
|
||||
storage_layer::delta_layer::DELTA_KEY_SIZE,
|
||||
},
|
||||
virtual_file::VirtualFile,
|
||||
};
|
||||
use std::fs;
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
#[derive(Subcommand)]
|
||||
pub(crate) enum LayerCmd {
|
||||
/// List all tenants and timelines under the pageserver path
|
||||
///
|
||||
/// Example: `cargo run --bin pagectl layer list .neon/`
|
||||
List { path: PathBuf },
|
||||
/// List all layers of a given tenant and timeline
|
||||
///
|
||||
/// Example: `cargo run --bin pagectl layer list .neon/`
|
||||
ListLayer {
|
||||
path: PathBuf,
|
||||
tenant: String,
|
||||
timeline: String,
|
||||
},
|
||||
/// Dump all information of a layer file
|
||||
DumpLayer {
|
||||
path: PathBuf,
|
||||
tenant: String,
|
||||
timeline: String,
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
}
|
||||
|
||||
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
use pageserver::tenant::blob_io::BlobCursor;
|
||||
use pageserver::tenant::block_io::BlockReader;
|
||||
|
||||
let path = path.as_ref();
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
actual_summary.index_start_blk,
|
||||
actual_summary.index_root_blk,
|
||||
&file,
|
||||
);
|
||||
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
|
||||
let mut all = vec![];
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value_offset| {
|
||||
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
||||
all.push((curr, BlobRef(value_offset)));
|
||||
true
|
||||
},
|
||||
)?;
|
||||
let mut cursor = BlockCursor::new(&file);
|
||||
for (k, v) in all {
|
||||
let value = cursor.read_blob(v.pos())?;
|
||||
println!("key:{} value_len:{}", k, value.len());
|
||||
}
|
||||
// TODO(chi): special handling for last key?
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
match cmd {
|
||||
LayerCmd::List { path } => {
|
||||
for tenant in fs::read_dir(path.join("tenants"))? {
|
||||
let tenant = tenant?;
|
||||
if !tenant.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
println!("tenant {}", tenant.file_name().to_string_lossy());
|
||||
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
|
||||
let timeline = timeline?;
|
||||
if !timeline.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
||||
}
|
||||
}
|
||||
}
|
||||
LayerCmd::ListLayer {
|
||||
path,
|
||||
tenant,
|
||||
timeline,
|
||||
} => {
|
||||
let timeline_path = path
|
||||
.join("tenants")
|
||||
.join(tenant)
|
||||
.join("timelines")
|
||||
.join(timeline);
|
||||
let mut idx = 0;
|
||||
for layer in fs::read_dir(timeline_path)? {
|
||||
let layer = layer?;
|
||||
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
path,
|
||||
tenant,
|
||||
timeline,
|
||||
id,
|
||||
} => {
|
||||
let timeline_path = path
|
||||
.join("tenants")
|
||||
.join(tenant)
|
||||
.join("timelines")
|
||||
.join(timeline);
|
||||
let mut idx = 0;
|
||||
for layer in fs::read_dir(timeline_path)? {
|
||||
let layer = layer?;
|
||||
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
|
||||
{
|
||||
if *id == idx {
|
||||
// TODO(chi): dedup code
|
||||
println!(
|
||||
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
|
||||
idx,
|
||||
layer_file.key_range.start,
|
||||
layer_file.key_range.end,
|
||||
layer_file.lsn_range.start,
|
||||
layer_file.lsn_range.end,
|
||||
layer_file.is_delta,
|
||||
);
|
||||
|
||||
if layer_file.is_delta {
|
||||
read_delta_file(layer.path())?;
|
||||
} else {
|
||||
anyhow::bail!("not supported yet :(");
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
179
pageserver/ctl/src/main.rs
Normal file
179
pageserver/ctl/src/main.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
//! A helper tool to manage pageserver binary files.
|
||||
//! Accepts a file as an argument, attempts to parse it with all ways possible
|
||||
//! and prints its interpreted context.
|
||||
//!
|
||||
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
|
||||
|
||||
mod draw_timeline_dir;
|
||||
mod layer_map_analyzer;
|
||||
mod layers;
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use std::path::{Path, PathBuf};
|
||||
use utils::{lsn::Lsn, project_git_version};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(
|
||||
version = GIT_VERSION,
|
||||
about = "Neon Pageserver binutils",
|
||||
long_about = "Reads pageserver (and related) binary files management utility"
|
||||
)]
|
||||
#[command(propagate_version = true)]
|
||||
struct CliOpts {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
Metadata(MetadataCmd),
|
||||
PrintLayerFile(PrintLayerFileCmd),
|
||||
DrawTimeline {},
|
||||
AnalyzeLayerMap(AnalyzeLayerMapCmd),
|
||||
#[command(subcommand)]
|
||||
Layer(LayerCmd),
|
||||
}
|
||||
|
||||
/// Read and update pageserver metadata file
|
||||
#[derive(Parser)]
|
||||
struct MetadataCmd {
|
||||
/// Input metadata file path
|
||||
metadata_path: PathBuf,
|
||||
/// Replace disk consistent Lsn
|
||||
disk_consistent_lsn: Option<Lsn>,
|
||||
/// Replace previous record Lsn
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
/// Replace latest gc cuttoff
|
||||
latest_gc_cuttoff: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct PrintLayerFileCmd {
|
||||
/// Pageserver data path
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct AnalyzeLayerMapCmd {
|
||||
/// Pageserver data path
|
||||
path: PathBuf,
|
||||
/// Max holes
|
||||
max_holes: Option<usize>,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let cli = CliOpts::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Layer(cmd) => {
|
||||
layers::main(&cmd)?;
|
||||
}
|
||||
Commands::Metadata(cmd) => {
|
||||
handle_metadata(&cmd)?;
|
||||
}
|
||||
Commands::DrawTimeline {} => {
|
||||
draw_timeline_dir::main()?;
|
||||
}
|
||||
Commands::AnalyzeLayerMap(cmd) => {
|
||||
layer_map_analyzer::main(&cmd)?;
|
||||
}
|
||||
Commands::PrintLayerFile(cmd) => {
|
||||
if let Err(e) = read_pg_control_file(&cmd.path) {
|
||||
println!(
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&cmd.path)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
|
||||
println!("{control_file:?}");
|
||||
let control_file_initdb = Lsn(control_file.checkPoint);
|
||||
println!(
|
||||
"pg_initdb_lsn: {}, aligned: {}",
|
||||
control_file_initdb,
|
||||
control_file_initdb.align()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
}
|
||||
|
||||
fn handle_metadata(
|
||||
MetadataCmd {
|
||||
metadata_path: path,
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
latest_gc_cuttoff,
|
||||
}: &MetadataCmd,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let metadata_bytes = std::fs::read(path)?;
|
||||
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
|
||||
println!("Current metadata:\n{meta:?}");
|
||||
let mut update_meta = false;
|
||||
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
|
||||
meta = TimelineMetadata::new(
|
||||
*disk_consistent_lsn,
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(prev_record_lsn) = prev_record_lsn {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
Some(*prev_record_lsn),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(latest_gc_cuttoff) = latest_gc_cuttoff {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
*latest_gc_cuttoff,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
if update_meta {
|
||||
let metadata_bytes = meta.to_bytes()?;
|
||||
std::fs::write(path, metadata_bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,174 +0,0 @@
|
||||
//! A helper tool to manage pageserver binary files.
|
||||
//! Accepts a file as an argument, attempts to parse it with all ways possible
|
||||
//! and prints its interpreted context.
|
||||
//!
|
||||
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::{value_parser, Arg, Command};
|
||||
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use utils::{lsn::Lsn, project_git_version};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const METADATA_SUBCOMMAND: &str = "metadata";
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let arg_matches = cli().get_matches();
|
||||
|
||||
match arg_matches.subcommand() {
|
||||
Some((subcommand_name, subcommand_matches)) => {
|
||||
let path = subcommand_matches
|
||||
.get_one::<PathBuf>("metadata_path")
|
||||
.context("'metadata_path' argument is missing")?
|
||||
.to_path_buf();
|
||||
anyhow::ensure!(
|
||||
subcommand_name == METADATA_SUBCOMMAND,
|
||||
"Unknown subcommand {subcommand_name}"
|
||||
);
|
||||
handle_metadata(&path, subcommand_matches)?;
|
||||
}
|
||||
None => {
|
||||
let path = arg_matches
|
||||
.get_one::<PathBuf>("path")
|
||||
.context("'path' argument is missing")?
|
||||
.to_path_buf();
|
||||
println!(
|
||||
"No subcommand specified, attempting to guess the format for file {}",
|
||||
path.display()
|
||||
);
|
||||
if let Err(e) = read_pg_control_file(&path) {
|
||||
println!(
|
||||
"Failed to read input file as a pg control one: {e:#}\n\
|
||||
Attempting to read it as layer file"
|
||||
);
|
||||
print_layerfile(&path)?;
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
|
||||
println!("{control_file:?}");
|
||||
let control_file_initdb = Lsn(control_file.checkPoint);
|
||||
println!(
|
||||
"pg_initdb_lsn: {}, aligned: {}",
|
||||
control_file_initdb,
|
||||
control_file_initdb.align()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx)
|
||||
}
|
||||
|
||||
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
|
||||
let metadata_bytes = std::fs::read(path)?;
|
||||
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
|
||||
println!("Current metadata:\n{meta:?}");
|
||||
let mut update_meta = false;
|
||||
if let Some(disk_consistent_lsn) = arg_matches.get_one::<String>("disk_consistent_lsn") {
|
||||
meta = TimelineMetadata::new(
|
||||
Lsn::from_str(disk_consistent_lsn)?,
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(prev_record_lsn) = arg_matches.get_one::<String>("prev_record_lsn") {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
Some(Lsn::from_str(prev_record_lsn)?),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
meta.latest_gc_cutoff_lsn(),
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
if let Some(latest_gc_cuttoff) = arg_matches.get_one::<String>("latest_gc_cuttoff") {
|
||||
meta = TimelineMetadata::new(
|
||||
meta.disk_consistent_lsn(),
|
||||
meta.prev_record_lsn(),
|
||||
meta.ancestor_timeline(),
|
||||
meta.ancestor_lsn(),
|
||||
Lsn::from_str(latest_gc_cuttoff)?,
|
||||
meta.initdb_lsn(),
|
||||
meta.pg_version(),
|
||||
);
|
||||
update_meta = true;
|
||||
}
|
||||
|
||||
if update_meta {
|
||||
let metadata_bytes = meta.to_bytes()?;
|
||||
std::fs::write(path, metadata_bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cli() -> Command {
|
||||
Command::new("Neon Pageserver binutils")
|
||||
.about("Reads pageserver (and related) binary files management utility")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("path")
|
||||
.help("Input file path")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false),
|
||||
)
|
||||
.subcommand(
|
||||
Command::new(METADATA_SUBCOMMAND)
|
||||
.about("Read and update pageserver metadata file")
|
||||
.arg(
|
||||
Arg::new("metadata_path")
|
||||
.help("Input metadata file path")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("disk_consistent_lsn")
|
||||
.long("disk_consistent_lsn")
|
||||
.help("Replace disk consistent Lsn"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("prev_record_lsn")
|
||||
.long("prev_record_lsn")
|
||||
.help("Replace previous record Lsn"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("latest_gc_cuttoff")
|
||||
.long("latest_gc_cuttoff")
|
||||
.help("Replace latest gc cuttoff"),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_cli() {
|
||||
cli().debug_assert();
|
||||
}
|
||||
@@ -77,7 +77,7 @@ use utils::{
|
||||
lsn::{Lsn, RecordLsn},
|
||||
};
|
||||
|
||||
mod blob_io;
|
||||
pub mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
|
||||
@@ -56,6 +56,7 @@ use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use utils::lsn::Lsn;
|
||||
use tracing::*;
|
||||
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
@@ -275,11 +276,14 @@ where
|
||||
///
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
|
||||
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
|
||||
self.historic.insert(
|
||||
historic_layer_coverage::LayerKey::from(&*layer),
|
||||
Arc::clone(&layer),
|
||||
);
|
||||
|
||||
let key = historic_layer_coverage::LayerKey::from(&*layer);
|
||||
if self.historic.contains(&key) {
|
||||
error!(
|
||||
"Attempt to insert duplicate layer {} in layer map",
|
||||
layer.short_id()
|
||||
);
|
||||
}
|
||||
self.historic.insert(key, Arc::clone(&layer));
|
||||
if Self::is_l0(&layer) {
|
||||
self.l0_delta_layers.push(layer);
|
||||
}
|
||||
|
||||
@@ -417,7 +417,15 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
||||
pub fn contains(&self, layer_key: &LayerKey) -> bool {
|
||||
match self.buffer.get(layer_key) {
|
||||
Some(None) => false, // layer remove was buffered
|
||||
Some(_) => true, // layer insert was buffered
|
||||
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
||||
self.buffer.insert(layer_key, Some(value));
|
||||
}
|
||||
|
||||
|
||||
@@ -542,7 +542,7 @@ impl From<LayerFileName> for LayerDescriptor {
|
||||
///
|
||||
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
|
||||
/// global config, and paths to layer files are constructed using the tenant/timeline
|
||||
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
|
||||
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
|
||||
/// struct for a file on disk, without having a page server running, so that we have no
|
||||
/// config. In that case, we use the Path variant to hold the full path to the file on
|
||||
/// disk.
|
||||
|
||||
@@ -110,7 +110,7 @@ const WILL_INIT: u64 = 1;
|
||||
/// reading/deserializing records themselves.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
struct BlobRef(u64);
|
||||
pub struct BlobRef(pub u64);
|
||||
|
||||
impl BlobRef {
|
||||
pub fn will_init(&self) -> bool {
|
||||
@@ -619,7 +619,7 @@ impl DeltaLayer {
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
|
||||
@@ -422,7 +422,7 @@ impl ImageLayer {
|
||||
|
||||
/// Create an ImageLayer struct representing an existing file on disk.
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
pub fn new_for_path(path: &Path, file: File) -> Result<ImageLayer> {
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
|
||||
@@ -3473,6 +3473,10 @@ impl Timeline {
|
||||
|
||||
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
|
||||
|
||||
fail_point!("compact-level0-phase1-finish", |_| {
|
||||
Err(anyhow::anyhow!("failpoint compact-level0-phase1-finish").into())
|
||||
});
|
||||
|
||||
Ok(CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
|
||||
43
test_runner/performance/test_duplicate_layers.py
Normal file
43
test_runner/performance/test_duplicate_layers.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Test duplicate layer detection
|
||||
#
|
||||
# This test sets fail point at the end of first compaction phase:
|
||||
# after flushing new L1 layers but before deletion of L0 layes
|
||||
# It should cause generation of duplicate L1 layer by compaction after restart
|
||||
@pytest.mark.timeout(600)
|
||||
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# These warnings are expected, when the pageserver is restarted abruptly
|
||||
env.pageserver.allowed_errors.append(".*found future image layer.*")
|
||||
env.pageserver.allowed_errors.append(".*found future delta layer.*")
|
||||
#env.pageserver.allowed_errors.append(".*duplicate layer.*")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "5 s",
|
||||
"compaction_threshold": "5",
|
||||
}
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
connstr = endpoint.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
|
||||
|
||||
pageserver_http.configure_failpoints(("compact-level0-phase1-finish", "exit"))
|
||||
|
||||
with pytest.raises(Exception):
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
|
||||
time.sleep(6) # let compaction to be performed
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
time.sleep(6) # let compaction to be performed
|
||||
Reference in New Issue
Block a user