mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
Rebased version of #5234, part of #6768 This consists of three parts: 1. A refactoring and new contract for implementing and testing compaction. The logic is now in a separate crate, with no dependency on the 'pageserver' crate. It defines an interface that the real pageserver must implement, in order to call the compaction algorithm. The interface models things like delta and image layers, but just the parts that the compaction algorithm needs to make decisions. That makes it easier unit test the algorithm and experiment with different implementations. I did not convert the current code to the new abstraction, however. When compaction algorithm is set to "Legacy", we just use the old code. It might be worthwhile to convert the old code to the new abstraction, so that we can compare the behavior of the new algorithm against the old one, using the same simulated cases. If we do that, have to be careful that the converted code really is equivalent to the old. This inclues only trivial changes to the main pageserver code. All the new code is behind a tenant config option. So this should be pretty safe to merge, even if the new implementation is buggy, as long as we don't enable it. 2. A new compaction algorithm, implemented using the new abstraction. The new algorithm is tiered compaction. It is inspired by the PoC at PR #4539, although I did not use that code directly, as I needed the new implementation to fit the new abstraction. The algorithm here is less advanced, I did not implement partial image layers, for example. I wanted to keep it simple on purpose, so that as we add bells and whistles, we can see the effects using the included simulator. One difference to #4539 and your typical LSM tree implementations is how we keep track of the LSM tree levels. This PR doesn't have a permanent concept of a level, tier or sorted run at all. There are just delta and image layers. However, when compaction starts, we look at the layers that exist, and arrange them into levels, depending on their shapes. That is ephemeral: when the compaction finishes, we forget that information. This allows the new algorithm to work without any extra bookkeeping. That makes it easier to transition from the old algorithm to new, and back again. There is just a new tenant config option to choose the compaction algorithm. The default is "Legacy", meaning the current algorithm in 'main'. If you set it to "Tiered", the new algorithm is used. 3. A simulator, which implements the new abstraction. The simulator can be used to analyze write and storage amplification, without running a test with the full pageserver. It can also draw an SVG animation of the simulation, to visualize how layers are created and deleted. To run the simulator: cargo run --bin compaction-simulator run-suite --------- Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
215 lines
6.0 KiB
Rust
215 lines
6.0 KiB
Rust
use clap::{Parser, Subcommand};
|
|
use pageserver_compaction::simulator::MockTimeline;
|
|
use rand::Rng;
|
|
use std::io::Write;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::OnceLock;
|
|
|
|
use utils::project_git_version;
|
|
|
|
project_git_version!(GIT_VERSION);
|
|
|
|
#[derive(Parser)]
|
|
#[command(
|
|
version = GIT_VERSION,
|
|
about = "Neon Pageserver compaction simulator",
|
|
long_about = "A developer tool to visualize and test compaction"
|
|
)]
|
|
#[command(propagate_version = true)]
|
|
struct CliOpts {
|
|
#[command(subcommand)]
|
|
command: Commands,
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Commands {
|
|
RunSuite,
|
|
Simulate(SimulateCmd),
|
|
}
|
|
|
|
#[derive(Clone, clap::ValueEnum)]
|
|
enum Distribution {
|
|
Uniform,
|
|
HotCold,
|
|
}
|
|
|
|
/// Read and update pageserver metadata file
|
|
#[derive(Parser)]
|
|
struct SimulateCmd {
|
|
distribution: Distribution,
|
|
|
|
/// Number of records to digest
|
|
num_records: u64,
|
|
/// Record length
|
|
record_len: u64,
|
|
|
|
// Logical database size in MB
|
|
logical_size: u64,
|
|
}
|
|
|
|
async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()> {
|
|
let mut executor = MockTimeline::new();
|
|
|
|
// Convert the logical size in MB into a key range.
|
|
let key_range = 0..((cmd.logical_size * 1024 * 1024) / 8192);
|
|
//let key_range = u64::MIN..u64::MAX;
|
|
println!(
|
|
"starting simulation with key range {:016X}-{:016X}",
|
|
key_range.start, key_range.end
|
|
);
|
|
|
|
// helper function to print progress indicator
|
|
let print_progress = |i| -> anyhow::Result<()> {
|
|
if i == 0 || (i + 1) % 10000 == 0 || i == cmd.num_records - 1 {
|
|
print!(
|
|
"\ringested {} / {} records, {} MiB / {} MiB...",
|
|
i + 1,
|
|
cmd.num_records,
|
|
(i + 1) * cmd.record_len / (1_000_000),
|
|
cmd.num_records * cmd.record_len / (1_000_000),
|
|
);
|
|
std::io::stdout().flush()?;
|
|
}
|
|
Ok(())
|
|
};
|
|
|
|
match cmd.distribution {
|
|
Distribution::Uniform => {
|
|
for i in 0..cmd.num_records {
|
|
executor.ingest_uniform(1, cmd.record_len, &key_range)?;
|
|
executor.compact_if_needed().await?;
|
|
|
|
print_progress(i)?;
|
|
}
|
|
}
|
|
Distribution::HotCold => {
|
|
let splitpoint = key_range.start + (key_range.end - key_range.start) / 10;
|
|
let hot_key_range = 0..splitpoint;
|
|
let cold_key_range = splitpoint..key_range.end;
|
|
|
|
for i in 0..cmd.num_records {
|
|
let chosen_range = if rand::thread_rng().gen_bool(0.9) {
|
|
&hot_key_range
|
|
} else {
|
|
&cold_key_range
|
|
};
|
|
executor.ingest_uniform(1, cmd.record_len, chosen_range)?;
|
|
executor.compact_if_needed().await?;
|
|
|
|
print_progress(i)?;
|
|
}
|
|
}
|
|
}
|
|
println!("done!");
|
|
executor.flush_l0();
|
|
executor.compact_if_needed().await?;
|
|
let stats = executor.stats()?;
|
|
|
|
// Print the stats to stdout, and also to a file
|
|
print!("{stats}");
|
|
std::fs::write(results_path.join("stats.txt"), stats)?;
|
|
|
|
let animation_path = results_path.join("compaction-animation.html");
|
|
executor.draw_history(std::fs::File::create(&animation_path)?)?;
|
|
println!(
|
|
"animation: file://{}",
|
|
animation_path.canonicalize()?.display()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn run_suite_cmd(results_path: &Path, workload: &SimulateCmd) -> anyhow::Result<()> {
|
|
std::fs::create_dir(results_path)?;
|
|
|
|
set_log_file(File::create(results_path.join("log"))?);
|
|
let result = simulate(workload, results_path).await;
|
|
set_log_stdout();
|
|
result
|
|
}
|
|
|
|
async fn run_suite() -> anyhow::Result<()> {
|
|
let top_results_path = PathBuf::from(format!(
|
|
"compaction-suite-results.{}",
|
|
std::time::SystemTime::UNIX_EPOCH.elapsed()?.as_secs()
|
|
));
|
|
std::fs::create_dir(&top_results_path)?;
|
|
|
|
let workload = SimulateCmd {
|
|
distribution: Distribution::Uniform,
|
|
// Generate 20 GB of WAL
|
|
record_len: 1_000,
|
|
num_records: 20_000_000,
|
|
// Logical size 5 GB
|
|
logical_size: 5_000,
|
|
};
|
|
|
|
run_suite_cmd(&top_results_path.join("uniform-20GB-5GB"), &workload).await?;
|
|
|
|
println!(
|
|
"All tests finished. Results in {}",
|
|
top_results_path.display()
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
use std::fs::File;
|
|
use std::io::Stdout;
|
|
use std::sync::Mutex;
|
|
use tracing_subscriber::fmt::writer::EitherWriter;
|
|
use tracing_subscriber::fmt::MakeWriter;
|
|
|
|
static LOG_FILE: OnceLock<Mutex<EitherWriter<File, Stdout>>> = OnceLock::new();
|
|
fn get_log_output() -> &'static Mutex<EitherWriter<File, Stdout>> {
|
|
LOG_FILE.get_or_init(|| std::sync::Mutex::new(EitherWriter::B(std::io::stdout())))
|
|
}
|
|
|
|
fn set_log_file(f: File) {
|
|
*get_log_output().lock().unwrap() = EitherWriter::A(f);
|
|
}
|
|
|
|
fn set_log_stdout() {
|
|
*get_log_output().lock().unwrap() = EitherWriter::B(std::io::stdout());
|
|
}
|
|
|
|
fn init_logging() -> anyhow::Result<()> {
|
|
// We fall back to printing all spans at info-level or above if
|
|
// the RUST_LOG environment variable is not set.
|
|
let rust_log_env_filter = || {
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
|
};
|
|
|
|
// NB: the order of the with() calls does not matter.
|
|
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
|
use tracing_subscriber::prelude::*;
|
|
tracing_subscriber::registry()
|
|
.with({
|
|
let log_layer = tracing_subscriber::fmt::layer()
|
|
.with_target(false)
|
|
.with_ansi(false)
|
|
.with_writer(|| get_log_output().make_writer());
|
|
log_layer.with_filter(rust_log_env_filter())
|
|
})
|
|
.init();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let cli = CliOpts::parse();
|
|
|
|
init_logging()?;
|
|
|
|
match cli.command {
|
|
Commands::Simulate(cmd) => {
|
|
simulate(&cmd, &PathBuf::from("/tmp/compactions.html")).await?;
|
|
}
|
|
Commands::RunSuite => {
|
|
run_suite().await?;
|
|
}
|
|
};
|
|
Ok(())
|
|
}
|