Add new compaction abstraction, simulator, and implementation. (#6830)

Rebased version of #5234, part of #6768

This consists of three parts:

1. A refactoring and new contract for implementing and testing
compaction.

The logic is now in a separate crate, with no dependency on the
'pageserver' crate. It defines an interface that the real pageserver
must implement, in order to call the compaction algorithm. The interface
models things like delta and image layers, but just the parts that the
compaction algorithm needs to make decisions. That makes it easier unit
test the algorithm and experiment with different implementations.

I did not convert the current code to the new abstraction, however. When
compaction algorithm is set to "Legacy", we just use the old code. It
might be worthwhile to convert the old code to the new abstraction, so
that we can compare the behavior of the new algorithm against the old
one, using the same simulated cases. If we do that, have to be careful
that the converted code really is equivalent to the old.

This inclues only trivial changes to the main pageserver code. All the
new code is behind a tenant config option. So this should be pretty safe
to merge, even if the new implementation is buggy, as long as we don't
enable it.

2. A new compaction algorithm, implemented using the new abstraction.

The new algorithm is tiered compaction. It is inspired by the PoC at PR
#4539, although I did not use that code directly, as I needed the new
implementation to fit the new abstraction. The algorithm here is less
advanced, I did not implement partial image layers, for example. I
wanted to keep it simple on purpose, so that as we add bells and
whistles, we can see the effects using the included simulator.

One difference to #4539 and your typical LSM tree implementations is how
we keep track of the LSM tree levels. This PR doesn't have a permanent
concept of a level, tier or sorted run at all. There are just delta and
image layers. However, when compaction starts, we look at the layers
that exist, and arrange them into levels, depending on their shapes.
That is ephemeral: when the compaction finishes, we forget that
information. This allows the new algorithm to work without any extra
bookkeeping. That makes it easier to transition from the old algorithm
to new, and back again.

There is just a new tenant config option to choose the compaction
algorithm. The default is "Legacy", meaning the current algorithm in
'main'. If you set it to "Tiered", the new algorithm is used.

3. A simulator, which implements the new abstraction.

The simulator can be used to analyze write and storage amplification,
without running a test with the full pageserver. It can also draw an SVG
animation of the simulation, to visualize how layers are created and
deleted.

To run the simulator:

    cargo run --bin compaction-simulator run-suite

---------

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Arpad Müller
2024-02-27 17:15:46 +01:00
committed by GitHub
parent c8ac4c054e
commit 045bc6af8b
25 changed files with 3687 additions and 14 deletions

1
.gitignore vendored
View File

@@ -9,6 +9,7 @@ test_output/
neon.iml
/.neon
/integration_tests/.neon
compaction-suite-results.*
# Coverage
*.profraw

48
Cargo.lock generated
View File

@@ -3498,6 +3498,7 @@ dependencies = [
"num_cpus",
"once_cell",
"pageserver_api",
"pageserver_compaction",
"pin-project-lite",
"postgres",
"postgres-protocol",
@@ -3588,6 +3589,53 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"async-stream",
"async-trait",
"byteorder",
"bytes",
"chrono",
"clap",
"const_format",
"consumption_metrics",
"criterion",
"crossbeam-utils",
"either",
"fail",
"flate2",
"futures",
"git-version",
"hex",
"hex-literal",
"humantime",
"humantime-serde",
"itertools",
"metrics",
"once_cell",
"pageserver_api",
"pin-project-lite",
"rand 0.8.5",
"smallvec",
"svg_fmt",
"sync_wrapper",
"thiserror",
"tokio",
"tokio-io-timeout",
"tokio-util",
"tracing",
"tracing-error",
"tracing-subscriber",
"url",
"utils",
"walkdir",
"workspace_hack",
]
[[package]]
name = "parking"
version = "2.1.1"

View File

@@ -5,6 +5,7 @@ members = [
"control_plane",
"control_plane/attachment_service",
"pageserver",
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/pagebench",
@@ -199,6 +200,7 @@ consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

View File

@@ -352,6 +352,11 @@ impl PageServerNode {
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
compaction_algorithm: settings
.remove("compaction_algorithm")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'compaction_algorithm' json")?,
gc_horizon: settings
.remove("gc_horizon")
.map(|x| x.parse::<u64>())
@@ -455,6 +460,11 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
compaction_algorithm: settings
.remove("compactin_algorithm")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'compaction_algorithm' json")?,
gc_horizon: settings
.remove("gc_horizon")
.map(|x| x.parse::<u64>())

View File

@@ -307,6 +307,7 @@ impl KeySpaceRandomAccum {
}
}
#[inline(always)]
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
let end = key_range.end;

View File

@@ -272,6 +272,8 @@ pub struct TenantConfig {
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
pub compaction_threshold: Option<usize>,
// defer parsing compaction_algorithm, like eviction_policy
pub compaction_algorithm: Option<CompactionAlgorithm>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
pub image_creation_threshold: Option<usize>,
@@ -306,6 +308,13 @@ impl EvictionPolicy {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum CompactionAlgorithm {
Legacy,
Tiered,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct EvictionPolicyLayerAccessThreshold {
#[serde(with = "humantime_serde")]

View File

@@ -73,6 +73,7 @@ url.workspace = true
walkdir.workspace = true
metrics.workspace = true
pageserver_api.workspace = true
pageserver_compaction.workspace = true
postgres_connection.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true

View File

@@ -0,0 +1,54 @@
[package]
name = "pageserver_compaction"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["string"] }
const_format.workspace = true
consumption_metrics.workspace = true
crossbeam-utils.workspace = true
either.workspace = true
flate2.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true
pin-project-lite.workspace = true
rand.workspace = true
smallvec = { workspace = true, features = ["write"] }
svg_fmt.workspace = true
sync_wrapper.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-io-timeout.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
walkdir.workspace = true
metrics.workspace = true
utils.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
criterion.workspace = true
hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }

View File

@@ -0,0 +1,51 @@
# TODO
- If the key space can be perfectly partitioned at some key, perform planning on each
partition separately. For example, if we are compacting a level with layers like this:
```
:
+--+ +----+ : +------+
| | | | : | |
+--+ +----+ : +------+
:
+-----+ +-+ : +--------+
| | | | : | |
+-----+ +-+ : +--------+
:
```
At the dotted line, there is a natural split in the key space, such that all
layers are either on the left or the right of it. We can compact the
partitions separately. We could choose to create image layers for one
partition but not the other one, for example.
- All the layers don't have to be exactly the same size, we can choose to cut a
layer short or stretch it a little larger than the target size, if it helps
the overall system. We can help perfect partitions (see previous bullet point)
to happen more frequently, by choosing the cut points wisely. For example, try
to cut layers at boundaries of underlying image layers. And "snap to grid",
i.e. don't cut layers at any key, but e.g. only when key % 10000 = 0.
- Avoid rewriting layers when we'd just create an identical layer to an input
layer.
- Parallelism. The code is already split up into planning and execution, so that
we first split up the compaction work into "Jobs", and then execute them.
It would be straightforward to execute multiple jobs in parallel.
- Materialize extra pages in delta layers during compaction. This would reduce
read amplification. There has been the idea of partial image layers. Materializing
extra pages in the delta layers achieve the same goal, without introducing a new
concept.
## Simulator
- Expand the simulator for more workloads
- Automate a test suite that runs the simluator with different workloads and
spits out a table of results
- Model read amplification
- More sanity checking. One idea is to keep a reference count of each
MockRecord, i.e. use Arc<MockRecord> instead of plain MockRecord, and panic if
a MockRecord that is newer than PITR horizon is completely dropped. That would
indicate that the record was lost.

View File

@@ -0,0 +1,214 @@
use clap::{Parser, Subcommand};
use pageserver_compaction::simulator::MockTimeline;
use rand::Rng;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
#[derive(Parser)]
#[command(
version = GIT_VERSION,
about = "Neon Pageserver compaction simulator",
long_about = "A developer tool to visualize and test compaction"
)]
#[command(propagate_version = true)]
struct CliOpts {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
RunSuite,
Simulate(SimulateCmd),
}
#[derive(Clone, clap::ValueEnum)]
enum Distribution {
Uniform,
HotCold,
}
/// Read and update pageserver metadata file
#[derive(Parser)]
struct SimulateCmd {
distribution: Distribution,
/// Number of records to digest
num_records: u64,
/// Record length
record_len: u64,
// Logical database size in MB
logical_size: u64,
}
async fn simulate(cmd: &SimulateCmd, results_path: &Path) -> anyhow::Result<()> {
let mut executor = MockTimeline::new();
// Convert the logical size in MB into a key range.
let key_range = 0..((cmd.logical_size * 1024 * 1024) / 8192);
//let key_range = u64::MIN..u64::MAX;
println!(
"starting simulation with key range {:016X}-{:016X}",
key_range.start, key_range.end
);
// helper function to print progress indicator
let print_progress = |i| -> anyhow::Result<()> {
if i == 0 || (i + 1) % 10000 == 0 || i == cmd.num_records - 1 {
print!(
"\ringested {} / {} records, {} MiB / {} MiB...",
i + 1,
cmd.num_records,
(i + 1) * cmd.record_len / (1_000_000),
cmd.num_records * cmd.record_len / (1_000_000),
);
std::io::stdout().flush()?;
}
Ok(())
};
match cmd.distribution {
Distribution::Uniform => {
for i in 0..cmd.num_records {
executor.ingest_uniform(1, cmd.record_len, &key_range)?;
executor.compact_if_needed().await?;
print_progress(i)?;
}
}
Distribution::HotCold => {
let splitpoint = key_range.start + (key_range.end - key_range.start) / 10;
let hot_key_range = 0..splitpoint;
let cold_key_range = splitpoint..key_range.end;
for i in 0..cmd.num_records {
let chosen_range = if rand::thread_rng().gen_bool(0.9) {
&hot_key_range
} else {
&cold_key_range
};
executor.ingest_uniform(1, cmd.record_len, chosen_range)?;
executor.compact_if_needed().await?;
print_progress(i)?;
}
}
}
println!("done!");
executor.flush_l0();
executor.compact_if_needed().await?;
let stats = executor.stats()?;
// Print the stats to stdout, and also to a file
print!("{stats}");
std::fs::write(results_path.join("stats.txt"), stats)?;
let animation_path = results_path.join("compaction-animation.html");
executor.draw_history(std::fs::File::create(&animation_path)?)?;
println!(
"animation: file://{}",
animation_path.canonicalize()?.display()
);
Ok(())
}
async fn run_suite_cmd(results_path: &Path, workload: &SimulateCmd) -> anyhow::Result<()> {
std::fs::create_dir(results_path)?;
set_log_file(File::create(results_path.join("log"))?);
let result = simulate(workload, results_path).await;
set_log_stdout();
result
}
async fn run_suite() -> anyhow::Result<()> {
let top_results_path = PathBuf::from(format!(
"compaction-suite-results.{}",
std::time::SystemTime::UNIX_EPOCH.elapsed()?.as_secs()
));
std::fs::create_dir(&top_results_path)?;
let workload = SimulateCmd {
distribution: Distribution::Uniform,
// Generate 20 GB of WAL
record_len: 1_000,
num_records: 20_000_000,
// Logical size 5 GB
logical_size: 5_000,
};
run_suite_cmd(&top_results_path.join("uniform-20GB-5GB"), &workload).await?;
println!(
"All tests finished. Results in {}",
top_results_path.display()
);
Ok(())
}
use std::fs::File;
use std::io::Stdout;
use std::sync::Mutex;
use tracing_subscriber::fmt::writer::EitherWriter;
use tracing_subscriber::fmt::MakeWriter;
static LOG_FILE: OnceLock<Mutex<EitherWriter<File, Stdout>>> = OnceLock::new();
fn get_log_output() -> &'static Mutex<EitherWriter<File, Stdout>> {
LOG_FILE.get_or_init(|| std::sync::Mutex::new(EitherWriter::B(std::io::stdout())))
}
fn set_log_file(f: File) {
*get_log_output().lock().unwrap() = EitherWriter::A(f);
}
fn set_log_stdout() {
*get_log_output().lock().unwrap() = EitherWriter::B(std::io::stdout());
}
fn init_logging() -> anyhow::Result<()> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(false)
.with_writer(|| get_log_output().make_writer());
log_layer.with_filter(rust_log_env_filter())
})
.init();
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = CliOpts::parse();
init_logging()?;
match cli.command {
Commands::Simulate(cmd) => {
simulate(&cmd, &PathBuf::from("/tmp/compactions.html")).await?;
}
Commands::RunSuite => {
run_suite().await?;
}
};
Ok(())
}

View File

@@ -0,0 +1,866 @@
//! # Tiered compaction algorithm.
//!
//! Read all the input delta files, and write a new set of delta files that
//! include all the input WAL records. See retile_deltas().
//!
//! In a "normal" LSM tree, you get to remove any values that are overwritten by
//! later values, but in our system, we keep all the history. So the reshuffling
//! doesn't remove any garbage, it just reshuffles the records to reduce read
//! amplification, i.e. the number of files that you need to access to find the
//! WAL records for a given key.
//!
//! If the new delta files would be very "narrow", i.e. each file would cover
//! only a narrow key range, then we create a new set of image files
//! instead. The current threshold is that if the estimated total size of the
//! image layers is smaller than the size of the deltas, then we create image
//! layers. That amounts to 2x storage amplification, and it means that the
//! distance of image layers in LSN dimension is roughly equal to the logical
//! database size. For example, if the logical database size is 10 GB, we would
//! generate new image layers every 10 GB of WAL.
use futures::StreamExt;
use tracing::{debug, info};
use std::collections::{HashSet, VecDeque};
use std::ops::Range;
use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with};
use crate::interface::*;
use utils::lsn::Lsn;
use crate::identify_levels::identify_level;
/// Main entry point to compaction.
///
/// The starting point is a cutoff LSN (`end_lsn`). The compaction is run on
/// everything below that point, that needs compaction. The cutoff LSN must
/// partition the layers so that there are no layers that span across that
/// LSN. To start compaction at the top of the tree, pass the end LSN of the
/// written last L0 layer.
pub async fn compact_tiered<E: CompactionJobExecutor>(
executor: &mut E,
end_lsn: Lsn,
target_file_size: u64,
fanout: u64,
ctx: &E::RequestContext,
) -> anyhow::Result<()> {
assert!(fanout >= 2);
// Start at L0
let mut current_level_no = 0;
let mut current_level_target_height = target_file_size;
loop {
// end LSN +1 to include possible image layers exactly at 'end_lsn'.
let all_layers = executor
.get_layers(
&(E::Key::MIN..E::Key::MAX),
&(Lsn(u64::MIN)..end_lsn + 1),
ctx,
)
.await?;
info!(
"Compacting L{}, total # of layers: {}",
current_level_no,
all_layers.len()
);
// Identify the range of LSNs that belong to this level. We assume that
// each file in this level span an LSN range up to 1.75x target file
// size. That should give us enough slop that if we created a slightly
// oversized L0 layer, e.g. because flushing the in-memory layer was
// delayed for some reason, we don't consider the oversized layer to
// belong to L1. But not too much slop, that we don't accidentally
// "skip" levels.
let max_height = (current_level_target_height as f64 * 1.75) as u64;
let Some(level) = identify_level(all_layers, end_lsn, max_height).await? else {
break;
};
// Calculate the height of this level. If the # of tiers exceeds the
// fanout parameter, it's time to compact it.
let depth = level.depth();
info!(
"Level {} identified as LSN range {}-{}: depth {}",
current_level_no, level.lsn_range.start, level.lsn_range.end, depth
);
for l in &level.layers {
debug!("LEVEL {} layer: {}", current_level_no, l.short_id());
}
if depth < fanout {
debug!(
level = current_level_no,
depth = depth,
fanout,
"too few deltas to compact"
);
break;
}
compact_level(
&level.lsn_range,
&level.layers,
executor,
target_file_size,
ctx,
)
.await?;
if target_file_size == u64::MAX {
break;
}
current_level_no += 1;
current_level_target_height = current_level_target_height.saturating_mul(fanout);
}
Ok(())
}
async fn compact_level<E: CompactionJobExecutor>(
lsn_range: &Range<Lsn>,
layers: &[E::Layer],
executor: &mut E,
target_file_size: u64,
ctx: &E::RequestContext,
) -> anyhow::Result<bool> {
let mut layer_fragments = Vec::new();
for l in layers {
layer_fragments.push(LayerFragment::new(l.clone()));
}
let mut state = LevelCompactionState {
target_file_size,
_lsn_range: lsn_range.clone(),
layers: layer_fragments,
jobs: Vec::new(),
job_queue: Vec::new(),
next_level: false,
executor,
};
let first_job = CompactionJob {
key_range: E::Key::MIN..E::Key::MAX,
lsn_range: lsn_range.clone(),
strategy: CompactionStrategy::Divide,
input_layers: state
.layers
.iter()
.enumerate()
.map(|i| LayerId(i.0))
.collect(),
completed: false,
};
state.jobs.push(first_job);
state.job_queue.push(JobId(0));
state.execute(ctx).await?;
info!(
"compaction completed! Need to process next level: {}",
state.next_level
);
Ok(state.next_level)
}
/// Blackboard that keeps track of the state of all the jobs and work remaining
struct LevelCompactionState<'a, E>
where
E: CompactionJobExecutor,
{
// parameters
target_file_size: u64,
_lsn_range: Range<Lsn>,
layers: Vec<LayerFragment<E>>,
// job queue
jobs: Vec<CompactionJob<E>>,
job_queue: Vec<JobId>,
/// If false, no need to compact levels below this
next_level: bool,
/// Interface to the outside world
executor: &'a mut E,
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
struct LayerId(usize);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
struct JobId(usize);
struct PendingJobSet {
pending: HashSet<JobId>,
completed: HashSet<JobId>,
}
impl PendingJobSet {
fn new() -> Self {
PendingJobSet {
pending: HashSet::new(),
completed: HashSet::new(),
}
}
fn complete_job(&mut self, job_id: JobId) {
self.pending.remove(&job_id);
self.completed.insert(job_id);
}
fn all_completed(&self) -> bool {
self.pending.is_empty()
}
}
// When we decide to rewrite a set of layers, LayerFragment is used to keep
// track which new layers supersede an old layer. When all the stakeholder jobs
// have completed, this layer can be deleted.
struct LayerFragment<E>
where
E: CompactionJobExecutor,
{
layer: E::Layer,
// If we will write new layers to replace this one, this keeps track of the
// jobs that need to complete before this layer can be deleted. As the jobs
// complete, they are moved from 'pending' to 'completed' set. Once the
// 'pending' set becomes empty, the layer can be deleted.
//
// If None, this layer is not rewritten and must not be deleted.
deletable_after: Option<PendingJobSet>,
deleted: bool,
}
impl<E> LayerFragment<E>
where
E: CompactionJobExecutor,
{
fn new(layer: E::Layer) -> Self {
LayerFragment {
layer,
deletable_after: None,
deleted: false,
}
}
}
#[derive(PartialEq)]
enum CompactionStrategy {
Divide,
CreateDelta,
CreateImage,
}
#[allow(dead_code)] // Todo
struct CompactionJob<E: CompactionJobExecutor> {
key_range: Range<E::Key>,
lsn_range: Range<Lsn>,
strategy: CompactionStrategy,
input_layers: Vec<LayerId>,
completed: bool,
}
impl<'a, E> LevelCompactionState<'a, E>
where
E: CompactionJobExecutor,
{
/// Main loop of the executor.
///
/// In each iteration, we take the next job from the queue, and execute it.
/// The execution might add new jobs to the queue. Keep going until the
/// queue is empty.
///
/// Initially, the job queue consists of one Divide job over the whole
/// level. On first call, it is divided into smaller jobs.
async fn execute(&mut self, ctx: &E::RequestContext) -> anyhow::Result<()> {
// TODO: this would be pretty straightforward to parallelize with FuturesUnordered
while let Some(next_job_id) = self.job_queue.pop() {
info!("executing job {}", next_job_id.0);
self.execute_job(next_job_id, ctx).await?;
}
// all done!
Ok(())
}
async fn execute_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
let job = &self.jobs[job_id.0];
match job.strategy {
CompactionStrategy::Divide => {
self.divide_job(job_id, ctx).await?;
Ok(())
}
CompactionStrategy::CreateDelta => {
let mut deltas: Vec<E::DeltaLayer> = Vec::new();
let mut layer_ids: Vec<LayerId> = Vec::new();
for layer_id in &job.input_layers {
let layer = &self.layers[layer_id.0].layer;
if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
deltas.push(dl.clone());
layer_ids.push(*layer_id);
}
}
self.executor
.create_delta(&job.lsn_range, &job.key_range, &deltas, ctx)
.await?;
self.jobs[job_id.0].completed = true;
// did we complete any fragments?
for layer_id in layer_ids {
let l = &mut self.layers[layer_id.0];
if let Some(deletable_after) = l.deletable_after.as_mut() {
deletable_after.complete_job(job_id);
if deletable_after.all_completed() {
self.executor.delete_layer(&l.layer, ctx).await?;
l.deleted = true;
}
}
}
self.next_level = true;
Ok(())
}
CompactionStrategy::CreateImage => {
self.executor
.create_image(job.lsn_range.end, &job.key_range, ctx)
.await?;
self.jobs[job_id.0].completed = true;
// TODO: we could check if any layers < PITR horizon became deletable
Ok(())
}
}
}
fn push_job(&mut self, job: CompactionJob<E>) -> JobId {
let job_id = JobId(self.jobs.len());
self.jobs.push(job);
self.job_queue.push(job_id);
job_id
}
/// Take a partition of the key space, and decide how to compact it.
///
/// TODO: Currently, this is called exactly once for the level, and we
/// decide whether to create new image layers to cover the whole level, or
/// write a new set of delta. In the future, this should try to partition
/// the key space, and make the decision separately for each partition.
async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
let job = &self.jobs[job_id.0];
assert!(job.strategy == CompactionStrategy::Divide);
// Check for dummy cases
if job.input_layers.is_empty() {
return Ok(());
}
let job = &self.jobs[job_id.0];
assert!(job.strategy == CompactionStrategy::Divide);
// Would it be better to create images for this partition?
// Decide based on the average density of the level
let keyspace_size = keyspace_total_size(
&self
.executor
.get_keyspace(&job.key_range, job.lsn_range.end, ctx)
.await?,
) * 8192;
let wal_size = job
.input_layers
.iter()
.filter(|layer_id| self.layers[layer_id.0].layer.is_delta())
.map(|layer_id| self.layers[layer_id.0].layer.file_size())
.sum::<u64>();
if keyspace_size < wal_size {
// seems worth it
info!(
"covering with images, because keyspace_size is {}, size of deltas between {}-{} is {}",
keyspace_size, job.lsn_range.start, job.lsn_range.end, wal_size
);
self.cover_with_images(job_id, ctx).await
} else {
// do deltas
info!(
"coverage not worth it, keyspace_size {}, wal_size {}",
keyspace_size, wal_size
);
self.retile_deltas(job_id, ctx).await
}
}
// LSN
// ^
// |
// | ###|###|#####
// | +--+-----+--+ +--+-----+--+
// | | | | | | | | |
// | +--+--+--+--+ +--+--+--+--+
// | | | | | | |
// | +---+-+-+---+ ==> +---+-+-+---+
// | | | | | | | | |
// | +---+-+-++--+ +---+-+-++--+
// | | | | | | | | |
// | +-----+--+--+ +-----+--+--+
// |
// +--------------> key
//
async fn cover_with_images(
&mut self,
job_id: JobId,
ctx: &E::RequestContext,
) -> anyhow::Result<()> {
let job = &self.jobs[job_id.0];
assert!(job.strategy == CompactionStrategy::Divide);
// XXX: do we still need the "holes" stuff?
let mut new_jobs = Vec::new();
// Slide a window through the keyspace
let keyspace = self
.executor
.get_keyspace(&job.key_range, job.lsn_range.end, ctx)
.await?;
let mut window = KeyspaceWindow::new(
E::Key::MIN..E::Key::MAX,
keyspace,
self.target_file_size / 8192,
);
while let Some(key_range) = window.choose_next_image() {
new_jobs.push(CompactionJob::<E> {
key_range,
lsn_range: job.lsn_range.clone(),
strategy: CompactionStrategy::CreateImage,
input_layers: Vec::new(), // XXX: Is it OK for this to be empty for image layer?
completed: false,
});
}
for j in new_jobs.into_iter().rev() {
let _job_id = self.push_job(j);
// TODO: image layers don't let us delete anything. unless < PITR horizon
//let j = &self.jobs[job_id.0];
// for layer_id in j.input_layers.iter() {
// self.layers[layer_id.0].pending_stakeholders.insert(job_id);
//}
}
Ok(())
}
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// We split the new delta layers on the key dimension. We iterate through
// the key space, and for each key, check if including the next key to the
// current output layer we're building would cause the layer to become too
// large. If so, dump the current output layer and start new one. It's
// possible that there is a single key with so many page versions that
// storing all of them in a single layer file would be too large. In that
// case, we also split on the LSN dimension.
//
// LSN
// ^
// |
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
//
//
// If one key (X) has a lot of page versions:
//
// LSN
// ^
// | (X)
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | +--+ |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | +--+ |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
//
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
async fn retile_deltas(
&mut self,
job_id: JobId,
ctx: &E::RequestContext,
) -> anyhow::Result<()> {
let job = &self.jobs[job_id.0];
assert!(job.strategy == CompactionStrategy::Divide);
// Sweep the key space left to right, running an estimate of how much
// disk size and keyspace we have accumulated
//
// Once the disk size reaches the target threshold, stop and think.
// If we have accumulated only a narrow band of keyspace, create an
// image layer. Otherwise write a delta layer.
// FIXME: deal with the case of lots of values for same key
// FIXME: we are ignoring images here. Did we already divide the work
// so that we won't encounter them here?
let mut deltas: Vec<E::DeltaLayer> = Vec::new();
for layer_id in &job.input_layers {
let l = &self.layers[layer_id.0];
if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
deltas.push(dl.clone());
}
}
// Open stream
let key_value_stream = std::pin::pin!(merge_delta_keys::<E>(deltas.as_slice(), ctx));
let mut new_jobs = Vec::new();
// Slide a window through the keyspace
let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
let mut all_in_window: bool = false;
let mut window = Window::new();
loop {
if all_in_window && window.elems.is_empty() {
// All done!
break;
}
if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
{
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: job.lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
} else {
assert!(!all_in_window);
if let Some(next_key) = key_accum.next().await.transpose()? {
window.feed(next_key.key, next_key.size);
} else {
all_in_window = true;
}
}
}
// All the input files are rewritten. Set up the tracking for when they can
// be deleted.
for layer_id in job.input_layers.iter() {
let l = &mut self.layers[layer_id.0];
assert!(l.deletable_after.is_none());
l.deletable_after = Some(PendingJobSet::new());
}
for j in new_jobs.into_iter().rev() {
let job_id = self.push_job(j);
let j = &self.jobs[job_id.0];
for layer_id in j.input_layers.iter() {
self.layers[layer_id.0]
.deletable_after
.as_mut()
.unwrap()
.pending
.insert(job_id);
}
}
Ok(())
}
}
// Sliding window through keyspace and values
// This is used by over_with_images to decide on good split points
struct KeyspaceWindow<K> {
head: KeyspaceWindowHead<K>,
start_pos: KeyspaceWindowPos<K>,
}
struct KeyspaceWindowHead<K> {
// overall key range to cover
key_range: Range<K>,
keyspace: Vec<Range<K>>,
target_keysize: u64,
}
#[derive(Clone)]
struct KeyspaceWindowPos<K> {
end_key: K,
keyspace_idx: usize,
accum_keysize: u64,
}
impl<K: CompactionKey> KeyspaceWindowPos<K> {
fn reached_end(&self, w: &KeyspaceWindowHead<K>) -> bool {
self.keyspace_idx == w.keyspace.len()
}
// Advance the cursor until it reaches 'target_keysize'.
fn advance_until_size(&mut self, w: &KeyspaceWindowHead<K>, max_size: u64) {
while self.accum_keysize < max_size && !self.reached_end(w) {
let curr_range = &w.keyspace[self.keyspace_idx];
if self.end_key < curr_range.start {
// skip over any unused space
self.end_key = curr_range.start;
}
// We're now within 'curr_range'. Can we advance past it completely?
let distance = K::key_range_size(&(self.end_key..curr_range.end));
if (self.accum_keysize + distance as u64) < max_size {
// oh yeah, it fits
self.end_key = curr_range.end;
self.keyspace_idx += 1;
self.accum_keysize += distance as u64;
} else {
// advance within the range
let skip_key = self.end_key.skip_some();
let distance = K::key_range_size(&(self.end_key..skip_key));
if (self.accum_keysize + distance as u64) < max_size {
self.end_key = skip_key;
self.accum_keysize += distance as u64;
} else {
self.end_key = self.end_key.next();
self.accum_keysize += 1;
}
}
}
}
}
impl<K> KeyspaceWindow<K>
where
K: CompactionKey,
{
fn new(key_range: Range<K>, keyspace: CompactionKeySpace<K>, target_keysize: u64) -> Self {
assert!(keyspace.first().unwrap().start >= key_range.start);
let start_key = key_range.start;
let start_pos = KeyspaceWindowPos::<K> {
end_key: start_key,
keyspace_idx: 0,
accum_keysize: 0,
};
Self {
head: KeyspaceWindowHead::<K> {
key_range,
keyspace,
target_keysize,
},
start_pos,
}
}
fn choose_next_image(&mut self) -> Option<Range<K>> {
if self.start_pos.keyspace_idx == self.head.keyspace.len() {
// we've reached the end
return None;
}
let mut next_pos = self.start_pos.clone();
next_pos.advance_until_size(
&self.head,
self.start_pos.accum_keysize + self.head.target_keysize,
);
// See if we can gobble up the rest of the keyspace if we stretch out the layer, up to
// 1.25x target size
let mut end_pos = next_pos.clone();
end_pos.advance_until_size(
&self.head,
self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4),
);
if end_pos.reached_end(&self.head) {
// gobble up any unused keyspace between the last used key and end of the range
assert!(end_pos.end_key <= self.head.key_range.end);
end_pos.end_key = self.head.key_range.end;
next_pos = end_pos;
}
let start_key = self.start_pos.end_key;
self.start_pos = next_pos;
Some(start_key..self.start_pos.end_key)
}
}
// Sliding window through keyspace and values
//
// This is used to decide what layer to write next, from the beginning of the window.
//
// Candidates:
//
// 1. Create an image layer, snapping to previous images
// 2. Create a delta layer, snapping to previous images
// 3. Create an image layer, snapping to
//
//
// Take previous partitioning, based on the image layers below.
//
// Candidate is at the front:
//
// Consider stretching an image layer to next divider? If it's close enough,
// that's the image candidate
//
// If it's too far, consider splitting at a reasonable point
//
// Is the image candidate smaller than the equivalent delta? If so,
// split off the image. Otherwise, split off one delta.
// Try to snap off the delta at a reasonable point
struct WindowElement<K> {
start_key: K, // inclusive
last_key: K, // inclusive
accum_size: u64,
}
struct Window<K> {
elems: VecDeque<WindowElement<K>>,
// last key that was split off, inclusive
splitoff_key: Option<K>,
splitoff_size: u64,
}
impl<K> Window<K>
where
K: CompactionKey,
{
fn new() -> Self {
Self {
elems: VecDeque::new(),
splitoff_key: None,
splitoff_size: 0,
}
}
fn feed(&mut self, key: K, size: u64) {
let last_size;
if let Some(last) = self.elems.back_mut() {
assert!(last.last_key <= key);
if key == last.last_key {
last.accum_size += size;
return;
}
last_size = last.accum_size;
} else {
last_size = 0;
}
// This is a new key.
let elem = WindowElement {
start_key: key,
last_key: key,
accum_size: last_size + size,
};
self.elems.push_back(elem);
}
fn remain_size(&self) -> u64 {
self.elems.back().unwrap().accum_size - self.splitoff_size
}
fn peek_size(&self) -> u64 {
self.elems.front().unwrap().accum_size - self.splitoff_size
}
fn commit_upto(&mut self, mut upto: usize) {
while upto > 1 {
let popped = self.elems.pop_front().unwrap();
self.elems.front_mut().unwrap().start_key = popped.start_key;
upto -= 1;
}
}
fn find_size_split(&self, target_size: u64) -> usize {
self.elems
.partition_point(|elem| elem.accum_size - self.splitoff_size < target_size)
}
fn pop(&mut self) {
let first = self.elems.pop_front().unwrap();
self.splitoff_size = first.accum_size;
self.splitoff_key = Some(first.last_key);
}
// the difference between delta and image is that an image covers
// any unused keyspace before and after, while a delta tries to
// minimize that. TODO: difference not implemented
fn pop_delta(&mut self) -> Range<K> {
let first = self.elems.front().unwrap();
let key_range = first.start_key..first.last_key.next();
self.pop();
key_range
}
// Prerequisite: we have enough input in the window
//
// On return None, the caller should feed more data and call again
fn choose_next_delta(&mut self, target_size: u64, has_more: bool) -> Option<Range<K>> {
if has_more && self.elems.is_empty() {
// Starting up
return None;
}
// If we still have an undersized candidate, just keep going
while self.peek_size() < target_size {
if self.elems.len() > 1 {
self.commit_upto(2);
} else if has_more {
return None;
} else {
break;
}
}
// Ensure we have enough input in the window to make a good decision
if has_more && self.remain_size() < target_size * 5 / 4 {
return None;
}
// The candidate on the front is now large enough, for a delta.
// And we have enough data in the window to decide.
// If we're willing to stretch it up to 1.25 target size, could we
// gobble up the rest of the work? This avoids creating very small
// "tail" layers at the end of the keyspace
if !has_more && self.remain_size() < target_size * 5 / 3 {
self.commit_upto(self.elems.len());
} else {
let delta_split_at = self.find_size_split(target_size);
self.commit_upto(delta_split_at);
// If it's still not large enough, request the caller to fill the window
if self.elems.len() == 1 && has_more {
return None;
}
}
Some(self.pop_delta())
}
}

View File

@@ -0,0 +1,243 @@
//! This file contains generic utility functions over the interface types,
//! which could be handy for any compaction implementation.
use crate::interface::*;
use futures::future::BoxFuture;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use pin_project_lite::pin_project;
use std::cmp::Ord;
use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
use std::task::{ready, Poll};
pub fn keyspace_total_size<K>(keyspace: &CompactionKeySpace<K>) -> u64
where
K: CompactionKey,
{
keyspace.iter().map(|r| K::key_range_size(r) as u64).sum()
}
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
pub fn union_to_keyspace<K: Ord>(a: &mut CompactionKeySpace<K>, b: CompactionKeySpace<K>) {
let x = std::mem::take(a);
let mut all_ranges_iter = [x.into_iter(), b.into_iter()]
.into_iter()
.kmerge_by(|a, b| a.start < b.start);
let mut ranges = Vec::new();
if let Some(first) = all_ranges_iter.next() {
let (mut start, mut end) = (first.start, first.end);
for r in all_ranges_iter {
assert!(r.start >= start);
if r.start > end {
ranges.push(start..end);
start = r.start;
end = r.end;
} else if r.end > end {
end = r.end;
}
}
ranges.push(start..end);
}
*a = ranges
}
pub fn intersect_keyspace<K: Ord + Clone + Copy>(
a: &CompactionKeySpace<K>,
r: &Range<K>,
) -> CompactionKeySpace<K> {
let mut ranges: Vec<Range<K>> = Vec::new();
for x in a.iter() {
if x.end <= r.start {
continue;
}
if x.start >= r.end {
break;
}
ranges.push(x.clone())
}
// trim the ends
if let Some(first) = ranges.first_mut() {
first.start = std::cmp::max(first.start, r.start);
}
if let Some(last) = ranges.last_mut() {
last.end = std::cmp::min(last.end, r.end);
}
ranges
}
/// Create a stream that iterates through all DeltaEntrys among all input
/// layers, in key-lsn order.
///
/// This is public because the create_delta() implementation likely wants to use this too
/// TODO: move to a more shared place
pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
) -> MergeDeltaKeys<'a, E> {
// Use a binary heap to merge the layers. Each input layer is initially
// represented by a LazyLoadLayer::Unloaded element, which uses the start of
// the layer's key range as the key. The first time a layer reaches the top
// of the heap, all the keys of the layer are loaded into a sorted vector.
//
// This helps to keep the memory usage reasonable: we only need to hold in
// memory the DeltaEntrys of the layers that overlap with the "current" key.
let mut heap: BinaryHeap<LazyLoadLayer<'a, E>> = BinaryHeap::new();
for l in layers {
heap.push(LazyLoadLayer::Unloaded(l));
}
MergeDeltaKeys {
heap,
ctx,
load_future: None,
}
}
enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
Unloaded(&'a E::DeltaLayer),
}
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
fn key(&self) -> E::Key {
match self {
Self::Loaded(entries) => entries.front().unwrap().key(),
Self::Unloaded(dl) => dl.key_range().start,
}
}
}
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// reverse order so that we get a min-heap
other.key().cmp(&self.key())
}
}
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
fn eq(&self, other: &Self) -> bool {
self.key().eq(&other.key())
}
}
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
type LoadFuture<'a, E> = BoxFuture<'a, anyhow::Result<Vec<E>>>;
// Stream returned by `merge_delta_keys`
pin_project! {
#[allow(clippy::type_complexity)]
pub struct MergeDeltaKeys<'a, E: CompactionJobExecutor> {
heap: BinaryHeap<LazyLoadLayer<'a, E>>,
#[pin]
load_future: Option<LoadFuture<'a, <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>,
ctx: &'a E::RequestContext,
}
}
impl<'a, E> Stream for MergeDeltaKeys<'a, E>
where
E: CompactionJobExecutor + 'a,
{
type Item = anyhow::Result<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::option::Option<<Self as futures::Stream>::Item>> {
let mut this = self.project();
loop {
if let Some(mut load_future) = this.load_future.as_mut().as_pin_mut() {
// We are waiting for loading the keys to finish
match ready!(load_future.as_mut().poll(cx)) {
Ok(entries) => {
this.load_future.set(None);
*this.heap.peek_mut().unwrap() =
LazyLoadLayer::Loaded(VecDeque::from(entries));
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
}
}
// If the topmost layer in the heap hasn't been loaded yet, start
// loading it. Otherwise return the next entry from it and update
// the layer's position in the heap (this decreaseKey operation is
// performed implicitly when `top` is dropped).
if let Some(mut top) = this.heap.peek_mut() {
match top.deref_mut() {
LazyLoadLayer::Unloaded(ref mut l) => {
let fut = l.load_keys(this.ctx);
this.load_future.set(Some(fut));
continue;
}
LazyLoadLayer::Loaded(ref mut entries) => {
let result = entries.pop_front().unwrap();
if entries.is_empty() {
std::collections::binary_heap::PeekMut::pop(top);
}
return Poll::Ready(Some(Ok(result)));
}
}
} else {
return Poll::Ready(None);
}
}
}
}
// Accumulate values at key boundaries
pub struct KeySize<K> {
pub key: K,
pub num_values: u64,
pub size: u64,
}
pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
where
K: Eq,
I: Stream<Item = Result<D, E>>,
D: CompactionDeltaEntry<'a, K>,
{
async_stream::try_stream! {
// Initialize the state from the first value
let mut input = std::pin::pin!(input);
if let Some(first) = input.next().await {
let first = first?;
let mut accum: KeySize<K> = KeySize {
key: first.key(),
num_values: 1,
size: first.size(),
};
while let Some(this) = input.next().await {
let this = this?;
if this.key() == accum.key {
accum.size += this.size();
accum.num_values += 1;
} else {
yield accum;
accum = KeySize {
key: this.key(),
num_values: 1,
size: this.size(),
};
}
}
yield accum;
}
}
}

View File

@@ -0,0 +1,376 @@
//! An LSM tree consists of multiple levels, each exponential larger than the
//! previous level. And each level consists of be multiple "tiers". With tiered
//! compaction, a level is compacted when it has accumulated more than N tiers,
//! forming one tier on the next level.
//!
//! In the pageserver, we don't explicitly track the levels and tiers. Instead,
//! we identify them by looking at the shapes of the layers. It's an easy task
//! for a human, but it's not straightforward to come up with the exact
//! rules. Especially if there are cases like interrupted, half-finished
//! compactions, or highly skewed data distributions that have let us "skip"
//! some levels. It's not critical to classify all cases correctly; at worst we
//! delay some compaction work, and suffer from more read amplification, or we
//! perform some unnecessary compaction work.
//!
//! `identify_level` performs that shape-matching.
//!
//! It returns a Level struct, which has `depth()` function to count the number
//! of "tiers" in the level. The tier count is the max depth of stacked layers
//! within the level. That's a good measure, because the point of compacting is
//! to reduce read amplification, and the depth is what determines that.
//!
//! One interesting effect of this is that if we generate very small delta
//! layers at L0, e.g. because the L0 layers are flushed by timeout rather than
//! because they reach the target size, the L0 compaction will combine them to
//! one larger file. But if the combined file is still smaller than the target
//! file size, the file will still be considered to be part of L0 at the next
//! iteration.
use anyhow::bail;
use std::collections::BTreeSet;
use std::ops::Range;
use utils::lsn::Lsn;
use crate::interface::*;
use tracing::{info, trace};
pub struct Level<L> {
pub lsn_range: Range<Lsn>,
pub layers: Vec<L>,
}
/// Identify an LSN > `end_lsn` that partitions the LSN space, so that there are
/// no layers that cross the boundary LSN.
///
/// A further restriction is that all layers in the returned partition cover at
/// most 'lsn_max_size' LSN bytes.
pub async fn identify_level<K, L>(
all_layers: Vec<L>,
end_lsn: Lsn,
lsn_max_size: u64,
) -> anyhow::Result<Option<Level<L>>>
where
K: CompactionKey,
L: CompactionLayer<K> + Clone,
{
// filter out layers that are above the `end_lsn`, they are completely irrelevant.
let mut layers = Vec::new();
for l in all_layers {
if l.lsn_range().start < end_lsn && l.lsn_range().end > end_lsn {
// shouldn't happen. Indicates that the caller passed a bogus
// end_lsn.
bail!("identify_level() called with end_lsn that does not partition the LSN space: end_lsn {} intersects with layer {}", end_lsn, l.short_id());
}
// include image layers sitting exacty at `end_lsn`.
let is_image = !l.is_delta();
if (is_image && l.lsn_range().start > end_lsn)
|| (!is_image && l.lsn_range().start >= end_lsn)
{
continue;
}
layers.push(l);
}
// All the remaining layers either belong to this level, or are below it.
info!(
"identify level at {}, size {}, num layers below: {}",
end_lsn,
lsn_max_size,
layers.len()
);
if layers.is_empty() {
return Ok(None);
}
// Walk the ranges in LSN order.
//
// ----- end_lsn
// |
// |
// v
//
layers.sort_by_key(|l| l.lsn_range().end);
let mut candidate_start_lsn = end_lsn;
let mut candidate_layers: Vec<L> = Vec::new();
let mut current_best_start_lsn = end_lsn;
let mut current_best_layers: Vec<L> = Vec::new();
let mut iter = layers.into_iter();
loop {
let Some(l) = iter.next_back() else {
// Reached end. Accept the last candidate
current_best_start_lsn = candidate_start_lsn;
current_best_layers.extend_from_slice(&std::mem::take(&mut candidate_layers));
break;
};
trace!(
"inspecting {} for candidate {}, current best {}",
l.short_id(),
candidate_start_lsn,
current_best_start_lsn
);
let r = l.lsn_range();
// Image layers don't restrict our choice of cutoff LSN
if l.is_delta() {
// Is this candidate workable? In other words, are there any
// delta layers that span across this LSN
//
// Valid: Not valid:
// + +
// | | +
// + <- candidate + | <- candidate
// + +
// |
// +
if r.end <= candidate_start_lsn {
// Hooray, there are no crossing LSNs. And we have visited
// through all the layers within candidate..end_lsn. The
// current candidate can be accepted.
current_best_start_lsn = r.end;
current_best_layers.extend_from_slice(&std::mem::take(&mut candidate_layers));
candidate_start_lsn = r.start;
}
// Is it small enough to be considered part of this level?
if r.end.0 - r.start.0 > lsn_max_size {
// Too large, this layer belongs to next level. Stop.
trace!(
"too large {}, size {} vs {}",
l.short_id(),
r.end.0 - r.start.0,
lsn_max_size
);
break;
}
// If this crosses the candidate lsn, push it down.
if r.start < candidate_start_lsn {
trace!(
"layer {} prevents from stopping at {}",
l.short_id(),
candidate_start_lsn
);
candidate_start_lsn = r.start;
}
}
// Include this layer in our candidate
candidate_layers.push(l);
}
Ok(if current_best_start_lsn == end_lsn {
// empty level
None
} else {
Some(Level {
lsn_range: current_best_start_lsn..end_lsn,
layers: current_best_layers,
})
})
}
// helper struct used in depth()
struct Event<K> {
key: K,
layer_idx: usize,
start: bool,
}
impl<L> Level<L> {
/// Count the number of deltas stacked on each other.
pub fn depth<K>(&self) -> u64
where
K: CompactionKey,
L: CompactionLayer<K>,
{
let mut events: Vec<Event<K>> = Vec::new();
for (idx, l) in self.layers.iter().enumerate() {
events.push(Event {
key: l.key_range().start,
layer_idx: idx,
start: true,
});
events.push(Event {
key: l.key_range().end,
layer_idx: idx,
start: false,
});
}
events.sort_by_key(|e| (e.key, e.start));
// Sweep the key space left to right. Stop at each distinct key, and
// count the number of deltas on top of the highest image at that key.
//
// This is a little enefficient, as we walk through the active_set on
// every key. We could increment/decrement a counter on each step
// instead, but that'd require a bit more complex bookkeeping.
let mut active_set: BTreeSet<(Lsn, bool, usize)> = BTreeSet::new();
let mut max_depth = 0;
let mut events_iter = events.iter().peekable();
while let Some(e) = events_iter.next() {
let l = &self.layers[e.layer_idx];
let is_image = !l.is_delta();
// update the active set
if e.start {
active_set.insert((l.lsn_range().end, is_image, e.layer_idx));
} else {
active_set.remove(&(l.lsn_range().end, is_image, e.layer_idx));
}
// recalculate depth if this was the last event at this point
let more_events_at_this_key = events_iter
.peek()
.map_or(false, |next_e| next_e.key == e.key);
if !more_events_at_this_key {
let mut active_depth = 0;
for (_end_lsn, is_image, _idx) in active_set.iter().rev() {
if *is_image {
break;
}
active_depth += 1;
}
if active_depth > max_depth {
max_depth = active_depth;
}
}
}
max_depth
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::simulator::{Key, MockDeltaLayer, MockImageLayer, MockLayer};
use std::sync::{Arc, Mutex};
fn delta(key_range: Range<Key>, lsn_range: Range<Lsn>) -> MockLayer {
MockLayer::Delta(Arc::new(MockDeltaLayer {
key_range,
lsn_range,
// identify_level() doesn't pay attention to the rest of the fields
file_size: 0,
deleted: Mutex::new(false),
records: vec![],
}))
}
fn image(key_range: Range<Key>, lsn: Lsn) -> MockLayer {
MockLayer::Image(Arc::new(MockImageLayer {
key_range,
lsn_range: lsn..(lsn + 1),
// identify_level() doesn't pay attention to the rest of the fields
file_size: 0,
deleted: Mutex::new(false),
}))
}
#[tokio::test]
async fn test_identify_level() -> anyhow::Result<()> {
let layers = vec![
delta(Key::MIN..Key::MAX, Lsn(0x8000)..Lsn(0x9000)),
delta(Key::MIN..Key::MAX, Lsn(0x5000)..Lsn(0x7000)),
delta(Key::MIN..Key::MAX, Lsn(0x4000)..Lsn(0x5000)),
delta(Key::MIN..Key::MAX, Lsn(0x3000)..Lsn(0x4000)),
delta(Key::MIN..Key::MAX, Lsn(0x2000)..Lsn(0x3000)),
delta(Key::MIN..Key::MAX, Lsn(0x1000)..Lsn(0x2000)),
];
// All layers fit in the max file size
let level = identify_level(layers.clone(), Lsn(0x10000), 0x2000)
.await?
.unwrap();
assert_eq!(level.depth(), 6);
// Same LSN with smaller max file size. The second layer from the top is larger
// and belongs to next level.
let level = identify_level(layers.clone(), Lsn(0x10000), 0x1000)
.await?
.unwrap();
assert_eq!(level.depth(), 1);
// Call with a smaller LSN
let level = identify_level(layers.clone(), Lsn(0x3000), 0x1000)
.await?
.unwrap();
assert_eq!(level.depth(), 2);
// Call with an LSN that doesn't partition the space
let result = identify_level(layers, Lsn(0x6000), 0x1000).await;
assert!(result.is_err());
Ok(())
}
#[tokio::test]
async fn test_overlapping_lsn_ranges() -> anyhow::Result<()> {
// The files LSN ranges overlap, so even though there are more files that
// fit under the file size, they are not included in the level because they
// overlap so that we'd need to include the oldest file, too, which is
// larger
let layers = vec![
delta(Key::MIN..Key::MAX, Lsn(0x4000)..Lsn(0x5000)),
delta(Key::MIN..Key::MAX, Lsn(0x3000)..Lsn(0x4000)), // overlap
delta(Key::MIN..Key::MAX, Lsn(0x2500)..Lsn(0x3500)), // overlap
delta(Key::MIN..Key::MAX, Lsn(0x2000)..Lsn(0x3000)), // overlap
delta(Key::MIN..Key::MAX, Lsn(0x1000)..Lsn(0x2500)), // larger
];
let level = identify_level(layers.clone(), Lsn(0x10000), 0x1000)
.await?
.unwrap();
assert_eq!(level.depth(), 1);
Ok(())
}
#[tokio::test]
async fn test_depth_nonoverlapping() -> anyhow::Result<()> {
// The key ranges don't overlap, so depth is only 1.
let layers = vec![
delta(4000..5000, Lsn(0x6000)..Lsn(0x7000)),
delta(3000..4000, Lsn(0x7000)..Lsn(0x8000)),
delta(1000..2000, Lsn(0x8000)..Lsn(0x9000)),
];
let level = identify_level(layers.clone(), Lsn(0x10000), 0x2000)
.await?
.unwrap();
assert_eq!(level.layers.len(), 3);
assert_eq!(level.depth(), 1);
// Staggered. The 1st and 3rd layer don't overlap with each other.
let layers = vec![
delta(1000..2000, Lsn(0x8000)..Lsn(0x9000)),
delta(1500..2500, Lsn(0x7000)..Lsn(0x8000)),
delta(2000..3000, Lsn(0x6000)..Lsn(0x7000)),
];
let level = identify_level(layers.clone(), Lsn(0x10000), 0x2000)
.await?
.unwrap();
assert_eq!(level.layers.len(), 3);
assert_eq!(level.depth(), 2);
Ok(())
}
#[tokio::test]
async fn test_depth_images() -> anyhow::Result<()> {
let layers: Vec<MockLayer> = vec![
delta(1000..2000, Lsn(0x8000)..Lsn(0x9000)),
delta(1500..2500, Lsn(0x7000)..Lsn(0x8000)),
delta(2000..3000, Lsn(0x6000)..Lsn(0x7000)),
// This covers the same key range as the 2nd delta layer. The depth
// in that key range is therefore 0.
image(1500..2500, Lsn(0x9000)),
];
let level = identify_level(layers.clone(), Lsn(0x10000), 0x2000)
.await?
.unwrap();
assert_eq!(level.layers.len(), 4);
assert_eq!(level.depth(), 1);
Ok(())
}
}

View File

@@ -0,0 +1,167 @@
//! This is what the compaction implementation needs to know about
//! layers, keyspace etc.
//!
//! All the heavy lifting is done by the create_image and create_delta
//! functions that the implementor provides.
use async_trait::async_trait;
use pageserver_api::{key::Key, keyspace::key_range_size};
use std::ops::Range;
use utils::lsn::Lsn;
/// Public interface. This is the main thing that the implementor needs to provide
#[async_trait]
pub trait CompactionJobExecutor {
// Type system.
//
// We assume that there are two kinds of layers, deltas and images. The
// compaction doesn't distinguish whether they are stored locally or
// remotely.
//
// The keyspace is defined by CompactionKey trait.
//
type Key: CompactionKey;
type Layer: CompactionLayer<Self::Key> + Clone;
type DeltaLayer: CompactionDeltaLayer<Self> + Clone;
type ImageLayer: CompactionImageLayer<Self> + Clone;
// This is passed through to all the interface functions. The compaction
// implementation doesn't do anything with it, but it might be useful for
// the interface implementation.
type RequestContext: CompactionRequestContext;
// ----
// Functions that the planner uses to support its decisions
// ----
/// Return all layers that overlap the given bounding box.
async fn get_layers(
&mut self,
key_range: &Range<Self::Key>,
lsn_range: &Range<Lsn>,
ctx: &Self::RequestContext,
) -> anyhow::Result<Vec<Self::Layer>>;
async fn get_keyspace(
&mut self,
key_range: &Range<Self::Key>,
lsn: Lsn,
ctx: &Self::RequestContext,
) -> anyhow::Result<CompactionKeySpace<Self::Key>>;
/// NB: This is a pretty expensive operation. In the real pageserver
/// implementation, it downloads the layer, and keeps it resident
/// until the DeltaLayer is dropped.
async fn downcast_delta_layer(
&self,
layer: &Self::Layer,
) -> anyhow::Result<Option<Self::DeltaLayer>>;
// ----
// Functions to execute the plan
// ----
/// Create a new image layer, materializing all the values in the key range,
/// at given 'lsn'.
async fn create_image(
&mut self,
lsn: Lsn,
key_range: &Range<Self::Key>,
ctx: &Self::RequestContext,
) -> anyhow::Result<()>;
/// Create a new delta layer, containing all the values from 'input_layers'
/// in the given key and LSN range.
async fn create_delta(
&mut self,
lsn_range: &Range<Lsn>,
key_range: &Range<Self::Key>,
input_layers: &[Self::DeltaLayer],
ctx: &Self::RequestContext,
) -> anyhow::Result<()>;
/// Delete a layer. The compaction implementation will call this only after
/// all the create_image() or create_delta() calls that deletion of this
/// layer depends on have finished. But if the implementor has extra lazy
/// background tasks, like uploading the index json file to remote storage,
/// it is the implementation's responsibility to track those.
async fn delete_layer(
&mut self,
layer: &Self::Layer,
ctx: &Self::RequestContext,
) -> anyhow::Result<()>;
}
pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
const MIN: Self;
const MAX: Self;
/// Calculate distance between key_range.start and key_range.end.
///
/// This returns u32, for compatibility with Repository::key. If the
/// distance is larger, return u32::MAX.
fn key_range_size(key_range: &Range<Self>) -> u32;
// return "self + 1"
fn next(&self) -> Self;
// return "self + <some decent amount to skip>". The amount to skip
// is left to the implementation.
// FIXME: why not just "add(u32)" ? This is hard to use
fn skip_some(&self) -> Self;
}
impl CompactionKey for Key {
const MIN: Self = Self::MIN;
const MAX: Self = Self::MAX;
fn key_range_size(r: &std::ops::Range<Self>) -> u32 {
key_range_size(r)
}
fn next(&self) -> Key {
(self as &Key).next()
}
fn skip_some(&self) -> Key {
self.add(128)
}
}
/// Contiguous ranges of keys that belong to the key space. In key order, and
/// with no overlap.
pub type CompactionKeySpace<K> = Vec<Range<K>>;
/// Functions needed from all layers.
pub trait CompactionLayer<K: CompactionKey + ?Sized> {
fn key_range(&self) -> &Range<K>;
fn lsn_range(&self) -> &Range<Lsn>;
fn file_size(&self) -> u64;
/// For debugging, short human-readable representation of the layer. E.g. filename.
fn short_id(&self) -> String;
fn is_delta(&self) -> bool;
}
#[async_trait]
pub trait CompactionDeltaLayer<E: CompactionJobExecutor + ?Sized>: CompactionLayer<E::Key> {
type DeltaEntry<'a>: CompactionDeltaEntry<'a, E::Key>
where
Self: 'a;
/// Return all keys in this delta layer.
async fn load_keys<'a>(
&self,
ctx: &E::RequestContext,
) -> anyhow::Result<Vec<Self::DeltaEntry<'_>>>;
}
pub trait CompactionImageLayer<E: CompactionJobExecutor + ?Sized>: CompactionLayer<E::Key> {}
pub trait CompactionDeltaEntry<'a, K> {
fn key(&self) -> K;
fn lsn(&self) -> Lsn;
fn size(&self) -> u64;
}
pub trait CompactionRequestContext {}

View File

@@ -0,0 +1,12 @@
// The main module implementing the compaction algorithm
pub mod compact_tiered;
pub(crate) mod identify_levels;
// Traits that the caller of the compaction needs to implement
pub mod interface;
// Utility functions, useful for the implementation
pub mod helpers;
// A simulator with mock implementations of 'interface'
pub mod simulator;

View File

@@ -0,0 +1,613 @@
mod draw;
use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
use async_trait::async_trait;
use futures::StreamExt;
use rand::Rng;
use tracing::info;
use utils::lsn::Lsn;
use std::fmt::Write;
use std::ops::Range;
use std::sync::Arc;
use std::sync::Mutex;
use crate::helpers::{merge_delta_keys, overlaps_with};
use crate::interface;
use crate::interface::CompactionLayer;
//
// Implementation for the CompactionExecutor interface
//
pub struct MockTimeline {
// Parameters for the compaction algorithm
pub target_file_size: u64,
tiers_per_level: u64,
num_l0_flushes: u64,
last_compact_at_flush: u64,
last_flush_lsn: Lsn,
// In-memory layer
records: Vec<MockRecord>,
total_len: u64,
start_lsn: Lsn,
end_lsn: Lsn,
// Current keyspace at `end_lsn`. This is updated on every ingested record.
keyspace: KeySpace,
// historic keyspaces
old_keyspaces: Vec<(Lsn, KeySpace)>,
// "on-disk" layers
pub live_layers: Vec<MockLayer>,
num_deleted_layers: u64,
// Statistics
wal_ingested: u64,
bytes_written: u64,
bytes_deleted: u64,
layers_created: u64,
layers_deleted: u64,
// All the events - creation and deletion of files - are collected
// in 'history'. It is used to draw the SVG animation at the end.
time: u64,
history: Vec<draw::LayerTraceEvent>,
}
type KeySpace = interface::CompactionKeySpace<Key>;
pub struct MockRequestContext {}
impl interface::CompactionRequestContext for MockRequestContext {}
pub type Key = u64;
impl interface::CompactionKey for Key {
const MIN: Self = u64::MIN;
const MAX: Self = u64::MAX;
fn key_range_size(key_range: &Range<Self>) -> u32 {
std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
}
fn next(&self) -> Self {
self + 1
}
fn skip_some(&self) -> Self {
// round up to next xx
self + 100
}
}
#[derive(Clone)]
pub struct MockRecord {
lsn: Lsn,
key: Key,
len: u64,
}
impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
fn key(&self) -> Key {
self.key
}
fn lsn(&self) -> Lsn {
self.lsn
}
fn size(&self) -> u64 {
self.len
}
}
pub struct MockDeltaLayer {
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
pub file_size: u64,
pub deleted: Mutex<bool>,
pub records: Vec<MockRecord>,
}
impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
fn key_range(&self) -> &Range<Key> {
&self.key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.lsn_range
}
fn file_size(&self) -> u64 {
self.file_size
}
fn short_id(&self) -> String {
format!(
"{:016X}-{:016X}__{:08X}-{:08X}",
self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
)
}
fn is_delta(&self) -> bool {
true
}
}
#[async_trait]
impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
type DeltaEntry<'a> = MockRecord;
async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
Ok(self.records.clone())
}
}
pub struct MockImageLayer {
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
pub file_size: u64,
pub deleted: Mutex<bool>,
}
impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
fn key_range(&self) -> &Range<Key> {
&self.key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.lsn_range
}
fn file_size(&self) -> u64 {
self.file_size
}
fn short_id(&self) -> String {
format!(
"{:016X}-{:016X}__{:08X}",
self.key_range.start, self.key_range.end, self.lsn_range.start.0,
)
}
fn is_delta(&self) -> bool {
false
}
}
impl MockTimeline {
pub fn new() -> Self {
MockTimeline {
target_file_size: 256 * 1024 * 1024,
tiers_per_level: 4,
num_l0_flushes: 0,
last_compact_at_flush: 0,
last_flush_lsn: Lsn(0),
records: Vec::new(),
total_len: 0,
start_lsn: Lsn(1000),
end_lsn: Lsn(1000),
keyspace: KeySpace::new(),
old_keyspaces: vec![],
live_layers: vec![],
num_deleted_layers: 0,
wal_ingested: 0,
bytes_written: 0,
bytes_deleted: 0,
layers_created: 0,
layers_deleted: 0,
time: 0,
history: Vec::new(),
}
}
pub async fn compact(&mut self) -> anyhow::Result<()> {
let ctx = MockRequestContext {};
crate::compact_tiered::compact_tiered(
self,
self.last_flush_lsn,
self.target_file_size,
self.tiers_per_level,
&ctx,
)
.await?;
Ok(())
}
// Ingest one record to the timeline
pub fn ingest_record(&mut self, key: Key, len: u64) {
self.records.push(MockRecord {
lsn: self.end_lsn,
key,
len,
});
self.total_len += len;
self.end_lsn += len;
if self.total_len > self.target_file_size {
self.flush_l0();
}
}
pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
self.compact().await?;
self.last_compact_at_flush = self.num_l0_flushes;
}
Ok(())
}
pub fn flush_l0(&mut self) {
if self.records.is_empty() {
return;
}
let mut records = std::mem::take(&mut self.records);
records.sort_by_key(|rec| rec.key);
let lsn_range = self.start_lsn..self.end_lsn;
let new_layer = Arc::new(MockDeltaLayer {
key_range: Key::MIN..Key::MAX,
lsn_range: lsn_range.clone(),
file_size: self.total_len,
records,
deleted: Mutex::new(false),
});
info!("flushed L0 layer {}", new_layer.short_id());
self.live_layers.push(MockLayer::from(&new_layer));
// reset L0
self.start_lsn = self.end_lsn;
self.total_len = 0;
self.records = Vec::new();
self.layers_created += 1;
self.bytes_written += new_layer.file_size;
self.time += 1;
self.history.push(LayerTraceEvent {
time_rel: self.time,
op: LayerTraceOp::Flush,
file: LayerTraceFile {
filename: new_layer.short_id(),
key_range: new_layer.key_range.clone(),
lsn_range: new_layer.lsn_range.clone(),
},
});
self.num_l0_flushes += 1;
self.last_flush_lsn = self.end_lsn;
}
// Ingest `num_records' records to the timeline, with random keys
// uniformly distributed in `key_range`
pub fn ingest_uniform(
&mut self,
num_records: u64,
len: u64,
key_range: &Range<Key>,
) -> anyhow::Result<()> {
crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
let mut rng = rand::thread_rng();
for _ in 0..num_records {
self.ingest_record(rng.gen_range(key_range.clone()), len);
self.wal_ingested += len;
}
Ok(())
}
pub fn stats(&self) -> anyhow::Result<String> {
let mut s = String::new();
writeln!(s, "STATISTICS:")?;
writeln!(
s,
"WAL ingested: {:>10} MB",
self.wal_ingested / (1024 * 1024)
)?;
writeln!(
s,
"size created: {:>10} MB",
self.bytes_written / (1024 * 1024)
)?;
writeln!(
s,
"size deleted: {:>10} MB",
self.bytes_deleted / (1024 * 1024)
)?;
writeln!(s, "files created: {:>10}", self.layers_created)?;
writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
writeln!(
s,
"write amp: {:>10.2}",
self.bytes_written as f64 / self.wal_ingested as f64
)?;
writeln!(
s,
"storage amp: {:>10.2}",
(self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
)?;
Ok(s)
}
pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
draw::draw_history(&self.history, output)
}
}
impl Default for MockTimeline {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub enum MockLayer {
Delta(Arc<MockDeltaLayer>),
Image(Arc<MockImageLayer>),
}
impl interface::CompactionLayer<Key> for MockLayer {
fn key_range(&self) -> &Range<Key> {
match self {
MockLayer::Delta(this) => this.key_range(),
MockLayer::Image(this) => this.key_range(),
}
}
fn lsn_range(&self) -> &Range<Lsn> {
match self {
MockLayer::Delta(this) => this.lsn_range(),
MockLayer::Image(this) => this.lsn_range(),
}
}
fn file_size(&self) -> u64 {
match self {
MockLayer::Delta(this) => this.file_size(),
MockLayer::Image(this) => this.file_size(),
}
}
fn short_id(&self) -> String {
match self {
MockLayer::Delta(this) => this.short_id(),
MockLayer::Image(this) => this.short_id(),
}
}
fn is_delta(&self) -> bool {
match self {
MockLayer::Delta(_) => true,
MockLayer::Image(_) => false,
}
}
}
impl MockLayer {
fn is_deleted(&self) -> bool {
let guard = match self {
MockLayer::Delta(this) => this.deleted.lock().unwrap(),
MockLayer::Image(this) => this.deleted.lock().unwrap(),
};
*guard
}
fn mark_deleted(&self) {
let mut deleted_guard = match self {
MockLayer::Delta(this) => this.deleted.lock().unwrap(),
MockLayer::Image(this) => this.deleted.lock().unwrap(),
};
assert!(!*deleted_guard, "layer already deleted");
*deleted_guard = true;
}
}
impl From<&Arc<MockDeltaLayer>> for MockLayer {
fn from(l: &Arc<MockDeltaLayer>) -> Self {
MockLayer::Delta(l.clone())
}
}
impl From<&Arc<MockImageLayer>> for MockLayer {
fn from(l: &Arc<MockImageLayer>) -> Self {
MockLayer::Image(l.clone())
}
}
#[async_trait]
impl interface::CompactionJobExecutor for MockTimeline {
type Key = Key;
type Layer = MockLayer;
type DeltaLayer = Arc<MockDeltaLayer>;
type ImageLayer = Arc<MockImageLayer>;
type RequestContext = MockRequestContext;
async fn get_layers(
&mut self,
key_range: &Range<Self::Key>,
lsn_range: &Range<Lsn>,
_ctx: &Self::RequestContext,
) -> anyhow::Result<Vec<Self::Layer>> {
// Clear any deleted layers from our vec
self.live_layers.retain(|l| !l.is_deleted());
let layers: Vec<MockLayer> = self
.live_layers
.iter()
.filter(|l| {
overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
})
.cloned()
.collect();
Ok(layers)
}
async fn get_keyspace(
&mut self,
key_range: &Range<Self::Key>,
_lsn: Lsn,
_ctx: &Self::RequestContext,
) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
// find it in the levels
if self.old_keyspaces.is_empty() {
Ok(crate::helpers::intersect_keyspace(
&self.keyspace,
key_range,
))
} else {
// not implemented
// The mock implementation only allows requesting the
// keyspace at the level's end LSN. That's all that the
// current implementation needs.
panic!("keyspace not available for requested lsn");
}
}
async fn downcast_delta_layer(
&self,
layer: &MockLayer,
) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
Ok(match layer {
MockLayer::Delta(l) => Some(l.clone()),
MockLayer::Image(_) => None,
})
}
async fn create_image(
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &MockRequestContext,
) -> anyhow::Result<()> {
let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
let mut accum_size: u64 = 0;
for r in keyspace {
accum_size += r.end - r.start;
}
let new_layer = Arc::new(MockImageLayer {
key_range: key_range.clone(),
lsn_range: lsn..lsn,
file_size: accum_size * 8192,
deleted: Mutex::new(false),
});
info!(
"created image layer, size {}: {}",
new_layer.file_size,
new_layer.short_id()
);
self.live_layers.push(MockLayer::Image(new_layer.clone()));
// update stats
self.bytes_written += new_layer.file_size;
self.layers_created += 1;
self.time += 1;
self.history.push(LayerTraceEvent {
time_rel: self.time,
op: LayerTraceOp::CreateImage,
file: LayerTraceFile {
filename: new_layer.short_id(),
key_range: new_layer.key_range.clone(),
lsn_range: new_layer.lsn_range.clone(),
},
});
Ok(())
}
async fn create_delta(
&mut self,
lsn_range: &Range<Lsn>,
key_range: &Range<Key>,
input_layers: &[Arc<MockDeltaLayer>],
ctx: &MockRequestContext,
) -> anyhow::Result<()> {
let mut key_value_stream =
std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
let mut records: Vec<MockRecord> = Vec::new();
let mut total_len = 2;
while let Some(delta_entry) = key_value_stream.next().await {
let delta_entry: MockRecord = delta_entry?;
if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
total_len += delta_entry.len;
records.push(delta_entry);
}
}
let total_records = records.len();
let new_layer = Arc::new(MockDeltaLayer {
key_range: key_range.clone(),
lsn_range: lsn_range.clone(),
file_size: total_len,
records,
deleted: Mutex::new(false),
});
info!(
"created delta layer, recs {}, size {}: {}",
total_records,
total_len,
new_layer.short_id()
);
self.live_layers.push(MockLayer::Delta(new_layer.clone()));
// update stats
self.bytes_written += total_len;
self.layers_created += 1;
self.time += 1;
self.history.push(LayerTraceEvent {
time_rel: self.time,
op: LayerTraceOp::CreateDelta,
file: LayerTraceFile {
filename: new_layer.short_id(),
key_range: new_layer.key_range.clone(),
lsn_range: new_layer.lsn_range.clone(),
},
});
Ok(())
}
async fn delete_layer(
&mut self,
layer: &Self::Layer,
_ctx: &MockRequestContext,
) -> anyhow::Result<()> {
let layer = std::pin::pin!(layer);
info!("deleting layer: {}", layer.short_id());
self.num_deleted_layers += 1;
self.bytes_deleted += layer.file_size();
layer.mark_deleted();
self.time += 1;
self.history.push(LayerTraceEvent {
time_rel: self.time,
op: LayerTraceOp::Delete,
file: LayerTraceFile {
filename: layer.short_id(),
key_range: layer.key_range().clone(),
lsn_range: layer.lsn_range().clone(),
},
});
Ok(())
}
}

View File

@@ -0,0 +1,411 @@
use super::Key;
use anyhow::Result;
use std::cmp::Ordering;
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
fmt::Write,
ops::Range,
};
use svg_fmt::{rgb, BeginSvg, EndSvg, Fill, Stroke, Style};
use utils::lsn::Lsn;
// Map values to their compressed coordinate - the index the value
// would have in a sorted and deduplicated list of all values.
struct CoordinateMap<T: Ord + Copy> {
map: BTreeMap<T, usize>,
stretch: f32,
}
impl<T: Ord + Copy> CoordinateMap<T> {
fn new(coords: Vec<T>, stretch: f32) -> Self {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
Self { map, stretch }
}
// This assumes that the map contains an exact point for this.
// Use map_inexact for values inbetween
fn map(&self, val: T) -> f32 {
*self.map.get(&val).unwrap() as f32 * self.stretch
}
// the value is still assumed to be within the min/max bounds
// (this is currently unused)
fn _map_inexact(&self, val: T) -> f32 {
let prev = *self.map.range(..=val).next().unwrap().1;
let next = *self.map.range(val..).next().unwrap().1;
// interpolate
(prev as f32 + (next - prev) as f32) * self.stretch
}
fn max(&self) -> f32 {
self.map.len() as f32 * self.stretch
}
}
#[derive(PartialEq, Hash, Eq)]
pub enum LayerTraceOp {
Flush,
CreateDelta,
CreateImage,
Delete,
}
impl std::fmt::Display for LayerTraceOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
let op_str = match self {
LayerTraceOp::Flush => "flush",
LayerTraceOp::CreateDelta => "create_delta",
LayerTraceOp::CreateImage => "create_image",
LayerTraceOp::Delete => "delete",
};
f.write_str(op_str)
}
}
#[derive(PartialEq, Hash, Eq, Clone)]
pub struct LayerTraceFile {
pub filename: String,
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
}
impl LayerTraceFile {
fn is_image(&self) -> bool {
self.lsn_range.end == self.lsn_range.start
}
}
pub struct LayerTraceEvent {
pub time_rel: u64,
pub op: LayerTraceOp,
pub file: LayerTraceFile,
}
pub fn draw_history<W: std::io::Write>(history: &[LayerTraceEvent], mut output: W) -> Result<()> {
let mut files: Vec<LayerTraceFile> = Vec::new();
for event in history {
files.push(event.file.clone());
}
let last_time_rel = history.last().unwrap().time_rel;
// Collect all coordinates
let mut keys: Vec<Key> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for f in files.iter() {
keys.push(f.key_range.start);
keys.push(f.key_range.end);
lsns.push(f.lsn_range.start);
lsns.push(f.lsn_range.end);
}
// Analyze
let key_map = CoordinateMap::new(keys, 2.0);
// Stretch out vertically for better visibility
let lsn_map = CoordinateMap::new(lsns, 3.0);
let mut svg = String::new();
// Draw
writeln!(
svg,
"{}",
BeginSvg {
w: key_map.max(),
h: lsn_map.max(),
}
)?;
let lsn_max = lsn_map.max();
// Sort the files by LSN, but so that image layers go after all delta layers
// The SVG is painted in the order the elements appear, and we want to draw
// image layers on top of the delta layers if they overlap
//
// (This could also be implemented via z coordinates: image layers get one z
// coord, delta layers get another z coord.)
let mut files_sorted: Vec<LayerTraceFile> = files.into_iter().collect();
files_sorted.sort_by(|a, b| {
if a.is_image() && !b.is_image() {
Ordering::Greater
} else if !a.is_image() && b.is_image() {
Ordering::Less
} else {
a.lsn_range.end.cmp(&b.lsn_range.end)
}
});
writeln!(svg, "<!-- layers -->")?;
let mut files_seen = HashSet::new();
for f in files_sorted {
if files_seen.contains(&f) {
continue;
}
let key_start = key_map.map(f.key_range.start);
let key_end = key_map.map(f.key_range.end);
let key_diff = key_end - key_start;
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
}
let lsn_start = lsn_map.map(f.lsn_range.start);
let lsn_end = lsn_map.map(f.lsn_range.end);
// Fill in and thicken rectangle if it's an
// image layer so that we can see it.
let mut style = Style::default();
style.fill = Fill::Color(rgb(0x80, 0x80, 0x80));
style.stroke = Stroke::Color(rgb(0, 0, 0), 0.5);
let y_start = lsn_max - lsn_start;
let y_end = lsn_max - lsn_end;
let x_margin = 0.25;
let y_margin = 0.5;
match f.lsn_range.start.cmp(&f.lsn_range.end) {
Ordering::Less => {
write!(
svg,
r#" <rect id="layer_{}" x="{}" y="{}" width="{}" height="{}" ry="{}" style="{}">"#,
f.filename,
key_start + x_margin,
y_end + y_margin,
key_diff - x_margin * 2.0,
y_start - y_end - y_margin * 2.0,
1.0, // border_radius,
style,
)?;
write!(svg, "<title>{}</title>", f.filename)?;
writeln!(svg, "</rect>")?;
}
Ordering::Equal => {
//lsn_diff = 0.3;
//lsn_offset = -lsn_diff / 2.0;
//margin = 0.05;
style.fill = Fill::Color(rgb(0x80, 0, 0x80));
style.stroke = Stroke::Color(rgb(0x80, 0, 0x80), 3.0);
write!(
svg,
r#" <line id="layer_{}" x1="{}" y1="{}" x2="{}" y2="{}" style="{}">"#,
f.filename,
key_start + x_margin,
y_end,
key_end - x_margin,
y_end,
style,
)?;
write!(
svg,
"<title>{}<br>{} - {}</title>",
f.filename, lsn_end, y_end
)?;
writeln!(svg, "</line>")?;
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
}
files_seen.insert(f);
}
let mut record_style = Style::default();
record_style.fill = Fill::Color(rgb(0x80, 0x80, 0x80));
record_style.stroke = Stroke::None;
writeln!(svg, "{}", EndSvg)?;
let mut layer_events_str = String::new();
let mut first = true;
for e in history {
if !first {
writeln!(layer_events_str, ",")?;
}
write!(
layer_events_str,
r#" {{"time_rel": {}, "filename": "{}", "op": "{}"}}"#,
e.time_rel, e.file.filename, e.op
)?;
first = false;
}
writeln!(layer_events_str)?;
writeln!(
output,
r#"<!DOCTYPE html>
<html>
<head>
<style>
/* Keep the slider pinned at top */
.topbar {{
display: block;
overflow: hidden;
background-color: lightgrey;
position: fixed;
top: 0;
width: 100%;
/* width: 500px; */
}}
.slidercontainer {{
float: left;
width: 50%;
margin-right: 200px;
}}
.slider {{
float: left;
width: 100%;
}}
.legend {{
width: 200px;
float: right;
}}
/* Main content */
.main {{
margin-top: 50px; /* Add a top margin to avoid content overlay */
}}
</style>
</head>
<body onload="init()">
<script type="text/javascript">
var layer_events = [{layer_events_str}]
let ticker;
function init() {{
for (let i = 0; i < layer_events.length; i++) {{
var layer = document.getElementById("layer_" + layer_events[i].filename);
layer.style.visibility = "hidden";
}}
last_layer_event = -1;
moveSlider(last_slider_pos)
}}
function startAnimation() {{
ticker = setInterval(animateStep, 100);
}}
function stopAnimation() {{
clearInterval(ticker);
}}
function animateStep() {{
if (last_layer_event < layer_events.length - 1) {{
var slider = document.getElementById("time-slider");
let prevPos = slider.value
let nextEvent = last_layer_event + 1
while (nextEvent <= layer_events.length - 1) {{
if (layer_events[nextEvent].time_rel > prevPos) {{
break;
}}
nextEvent += 1;
}}
let nextPos = layer_events[nextEvent].time_rel
slider.value = nextPos
moveSlider(nextPos)
}}
}}
function redoLayerEvent(n, dir) {{
var layer = document.getElementById("layer_" + layer_events[n].filename);
switch (layer_events[n].op) {{
case "flush":
layer.style.visibility = "visible";
break;
case "create_delta":
layer.style.visibility = "visible";
break;
case "create_image":
layer.style.visibility = "visible";
break;
case "delete":
layer.style.visibility = "hidden";
break;
}}
}}
function undoLayerEvent(n) {{
var layer = document.getElementById("layer_" + layer_events[n].filename);
switch (layer_events[n].op) {{
case "flush":
layer.style.visibility = "hidden";
break;
case "create_delta":
layer.style.visibility = "hidden";
break;
case "create_image":
layer.style.visibility = "hidden";
break;
case "delete":
layer.style.visibility = "visible";
break;
}}
}}
var last_slider_pos = 0
var last_layer_event = 0
var moveSlider = function(new_pos) {{
if (new_pos > last_slider_pos) {{
while (last_layer_event < layer_events.length - 1) {{
if (layer_events[last_layer_event + 1].time_rel > new_pos) {{
break;
}}
last_layer_event += 1;
redoLayerEvent(last_layer_event)
}}
}}
if (new_pos < last_slider_pos) {{
while (last_layer_event >= 0) {{
if (layer_events[last_layer_event].time_rel <= new_pos) {{
break;
}}
undoLayerEvent(last_layer_event)
last_layer_event -= 1;
}}
}}
last_slider_pos = new_pos;
document.getElementById("debug_pos").textContent=new_pos;
if (last_layer_event >= 0) {{
document.getElementById("debug_layer_event").textContent=last_layer_event + " " + layer_events[last_layer_event].time_rel + " " + layer_events[last_layer_event].op;
}} else {{
document.getElementById("debug_layer_event").textContent="begin";
}}
}}
</script>
<div class="topbar">
<div class="slidercontainer">
<label for="time-slider">TIME</label>:
<input id="time-slider" class="slider" type="range" min="0" max="{last_time_rel}" value="0" oninput="moveSlider(this.value)"><br>
pos: <span id="debug_pos"></span><br>
event: <span id="debug_layer_event"></span><br>
gc: <span id="debug_gc_event"></span><br>
</div>
<button onclick="startAnimation()">Play</button>
<button onclick="stopAnimation()">Stop</button>
<svg class="legend">
<rect x=5 y=0 width=20 height=20 style="fill:rgb(128,128,128);stroke:rgb(0,0,0);stroke-width:0.5;fill-opacity:1;stroke-opacity:1;"/>
<line x1=5 y1=30 x2=25 y2=30 style="fill:rgb(128,0,128);stroke:rgb(128,0,128);stroke-width:3;fill-opacity:1;stroke-opacity:1;"/>
<line x1=0 y1=40 x2=30 y2=40 style="fill:none;stroke:rgb(255,0,0);stroke-width:0.5;fill-opacity:1;stroke-opacity:1;"/>
</svg>
</div>
<div class="main">
{svg}
</div>
</body>
</html>
"#
)?;
Ok(())
}

View File

@@ -0,0 +1,35 @@
use pageserver_compaction::interface::CompactionLayer;
use pageserver_compaction::simulator::MockTimeline;
/// Test the extreme case that there are so many updates for a single key that
/// even if we produce an extremely narrow delta layer, spanning just that one
/// key, we still too many records to fit in the target file size. We need to
/// split in the LSN dimension too in that case.
///
/// TODO: The code to avoid this problem has not been implemented yet! So the
/// assertion currently fails, but we need to make it not fail.
#[ignore]
#[tokio::test]
async fn test_many_updates_for_single_key() {
let mut executor = MockTimeline::new();
executor.target_file_size = 10_000_000; // 10 MB
// Ingest 100 MB of updates to a single key.
for _ in 1..1000 {
executor.ingest_uniform(100, 10, &(0..100_000)).unwrap();
executor.ingest_uniform(10_000, 10, &(0..1)).unwrap();
executor.compact().await.unwrap();
}
// Check that all the layers are smaller than the target size (with some slop)
for l in executor.live_layers.iter() {
println!("layer {}: {}", l.short_id(), l.file_size());
}
for l in executor.live_layers.iter() {
assert!(l.file_size() < executor.target_file_size * 2);
// sanity check that none of the delta layers are stupidly small either
if l.is_delta() {
assert!(l.file_size() > executor.target_file_size / 2);
}
}
}

View File

@@ -17,7 +17,7 @@ use tracing::*;
use utils::id::NodeId;
mod metrics;
use metrics::MetricsKey;
use crate::consumption_metrics::metrics::MetricsKey;
mod disk_cache;
mod upload;

View File

@@ -3632,6 +3632,7 @@ pub(crate) mod harness {
compaction_target_size: Some(tenant_conf.compaction_target_size),
compaction_period: Some(tenant_conf.compaction_period),
compaction_threshold: Some(tenant_conf.compaction_threshold),
compaction_algorithm: Some(tenant_conf.compaction_algorithm),
gc_horizon: Some(tenant_conf.gc_horizon),
gc_period: Some(tenant_conf.gc_period),
image_creation_threshold: Some(tenant_conf.image_creation_threshold),

View File

@@ -9,6 +9,7 @@
//! may lead to a data loss.
//!
use anyhow::bail;
use pageserver_api::models::CompactionAlgorithm;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::{self, ThrottleConfig};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
@@ -20,6 +21,7 @@ use std::time::Duration;
use utils::generation::Generation;
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
@@ -27,12 +29,17 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_TIMEOUT: &str = "10 m";
// FIXME the below configs are only used by legacy algorithm. The new algorithm
// has different parameters.
// Target file size, when creating image and delta layers.
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
super::CompactionAlgorithm::Legacy;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
@@ -305,6 +312,7 @@ pub struct TenantConf {
pub compaction_period: Duration,
// Level0 delta layer threshold for compaction.
pub compaction_threshold: usize,
pub compaction_algorithm: CompactionAlgorithm,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is #of bytes of WAL.
@@ -377,6 +385,10 @@ pub struct TenantConfOpt {
#[serde(default)]
pub compaction_threshold: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub compaction_algorithm: Option<CompactionAlgorithm>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_horizon: Option<u64>,
@@ -457,6 +469,9 @@ impl TenantConfOpt {
compaction_threshold: self
.compaction_threshold
.unwrap_or(global_conf.compaction_threshold),
compaction_algorithm: self
.compaction_algorithm
.unwrap_or(global_conf.compaction_algorithm),
gc_horizon: self.gc_horizon.unwrap_or(global_conf.gc_horizon),
gc_period: self.gc_period.unwrap_or(global_conf.gc_period),
image_creation_threshold: self
@@ -503,6 +518,7 @@ impl Default for TenantConf {
compaction_period: humantime::parse_duration(DEFAULT_COMPACTION_PERIOD)
.expect("cannot parse default compaction period"),
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
compaction_algorithm: DEFAULT_COMPACTION_ALGORITHM,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
@@ -580,6 +596,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
Self {
checkpoint_distance: value.checkpoint_distance,
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
compaction_algorithm: value.compaction_algorithm,
compaction_target_size: value.compaction_target_size,
compaction_period: value.compaction_period.map(humantime),
compaction_threshold: value.compaction_threshold,

View File

@@ -1120,3 +1120,15 @@ impl AsRef<DeltaLayerInner> for DeltaLayerInner {
self
}
}
impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
fn key(&self) -> Key {
self.key
}
fn lsn(&self) -> Lsn {
self.lsn
}
fn size(&self) -> u64 {
self.size
}
}

View File

@@ -1,3 +1,4 @@
mod compaction;
pub mod delete;
mod eviction_task;
mod init;
@@ -18,8 +19,8 @@ use once_cell::sync::Lazy;
use pageserver_api::{
keyspace::KeySpaceAccum,
models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
LayerMapInfo, TimelineState,
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
EvictionPolicy, LayerMapInfo, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, TenantShardId},
@@ -63,6 +64,7 @@ use crate::tenant::{
use crate::{
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
use crate::{
@@ -1093,6 +1095,19 @@ impl Timeline {
return Ok(());
}
match self.get_compaction_algorithm() {
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
}
}
/// TODO: cancellation
async fn compact_legacy(
self: &Arc<Self>,
_cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
@@ -1498,6 +1513,13 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
fn get_compaction_algorithm(&self) -> CompactionAlgorithm {
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_algorithm
.unwrap_or(self.conf.default_tenant_conf.compaction_algorithm)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf
@@ -3639,6 +3661,18 @@ pub(crate) enum CompactionError {
Other(#[from] anyhow::Error),
}
impl From<CollectKeySpaceError> for CompactionError {
fn from(err: CollectKeySpaceError) -> Self {
match err {
CollectKeySpaceError::Cancelled
| CollectKeySpaceError::PageRead(PageReconstructError::Cancelled) => {
CompactionError::ShuttingDown
}
e => CompactionError::Other(e.into()),
}
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
@@ -3758,7 +3792,7 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
impl Timeline {
/// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
async fn compact_level0_phase1(
self: &Arc<Self>,
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
@@ -4237,13 +4271,24 @@ impl Timeline {
return Ok(());
}
self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
.await?;
Ok(())
}
async fn finish_compact_batch(
self: &Arc<Self>,
new_deltas: &[ResidentLayer],
new_images: &[ResidentLayer],
layers_to_remove: &[Layer],
) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;
let mut duplicated_layers = HashSet::new();
let mut insert_layers = Vec::with_capacity(new_layers.len());
let mut insert_layers = Vec::with_capacity(new_deltas.len());
for l in &new_layers {
for l in new_deltas {
if guard.contains(l.as_ref()) {
// expected in tests
tracing::error!(layer=%l, "duplicated L1 layer");
@@ -4254,24 +4299,28 @@ impl Timeline {
// because we have not implemented L0 => L0 compaction.
duplicated_layers.insert(l.layer_desc().key());
} else if LayerMap::is_l0(l.layer_desc()) {
return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
bail!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
} else {
insert_layers.push(l.clone());
}
}
let remove_layers = {
let mut deltas_to_compact = deltas_to_compact;
// only remove those inputs which were not outputs
deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
deltas_to_compact
};
// only remove those inputs which were not outputs
let remove_layers: Vec<Layer> = layers_to_remove
.iter()
.filter(|l| !duplicated_layers.contains(&l.layer_desc().key()))
.cloned()
.collect();
if !new_images.is_empty() {
guard.track_new_image_layers(new_images, &self.metrics);
}
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
remote_client.schedule_compaction_update(&remove_layers, new_deltas)?;
}
drop_wlock(guard);

View File

@@ -0,0 +1,477 @@
//! New compaction implementation. The algorithm itself is implemented in the
//! compaction crate. This file implements the callbacks and structs that allow
//! the algorithm to drive the process.
//!
//! The old legacy algorithm is implemented directly in `timeline.rs`.
use std::ops::{Deref, Range};
use std::sync::Arc;
use super::Timeline;
use async_trait::async_trait;
use fail::fail_point;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use crate::context::RequestContext;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::timeline::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::tenant::PageReconstructError;
use crate::ZERO_PAGE;
use crate::keyspace::KeySpace;
use crate::repository::Key;
use utils::lsn::Lsn;
use pageserver_compaction::helpers::overlaps_with;
use pageserver_compaction::interface::*;
use super::CompactionError;
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
/// All the real work is in the implementation in the pageserver_compaction
/// crate. The code here would apply to any algorithm implemented by the
/// same interface, but tiered is the only one at the moment.
///
/// TODO: cancellation
pub(crate) async fn compact_tiered(
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let fanout = self.get_compaction_threshold() as u64;
let target_file_size = self.get_checkpoint_distance();
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let l0_deltas = layers.get_level0_deltas()?;
drop(guard);
// As an optimization, if we find that there are too few L0 layers,
// bail out early. We know that the compaction algorithm would do
// nothing in that case.
if l0_deltas.len() < fanout as usize {
// doesn't need compacting
return Ok(());
}
l0_deltas.iter().map(|l| l.lsn_range.end).max().unwrap()
};
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
return Err(CompactionError::ShuttingDown);
}
let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
let ctx_adaptor = RequestContextAdaptor(ctx.clone());
pageserver_compaction::compact_tiered::compact_tiered(
&mut adaptor,
end_lsn,
target_file_size,
fanout,
&ctx_adaptor,
)
.await?;
adaptor.flush_updates().await?;
Ok(())
}
}
struct TimelineAdaptor {
timeline: Arc<Timeline>,
keyspace: (Lsn, KeySpace),
new_deltas: Vec<ResidentLayer>,
new_images: Vec<ResidentLayer>,
layers_to_delete: Vec<Arc<PersistentLayerDesc>>,
}
impl TimelineAdaptor {
pub fn new(timeline: &Arc<Timeline>, keyspace: (Lsn, KeySpace)) -> Self {
Self {
timeline: timeline.clone(),
keyspace,
new_images: Vec::new(),
new_deltas: Vec::new(),
layers_to_delete: Vec::new(),
}
}
pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
let layers_to_delete = {
let guard = self.timeline.layers.read().await;
self.layers_to_delete
.iter()
.map(|x| guard.get_from_desc(x))
.collect::<Vec<Layer>>()
};
self.timeline
.finish_compact_batch(&self.new_deltas, &self.new_images, &layers_to_delete)
.await?;
self.new_images.clear();
self.new_deltas.clear();
self.layers_to_delete.clear();
Ok(())
}
}
#[derive(Clone)]
struct ResidentDeltaLayer(ResidentLayer);
#[derive(Clone)]
struct ResidentImageLayer(ResidentLayer);
#[async_trait]
impl CompactionJobExecutor for TimelineAdaptor {
type Key = crate::repository::Key;
type Layer = OwnArc<PersistentLayerDesc>;
type DeltaLayer = ResidentDeltaLayer;
type ImageLayer = ResidentImageLayer;
type RequestContext = RequestContextAdaptor;
async fn get_layers(
&mut self,
key_range: &Range<Key>,
lsn_range: &Range<Lsn>,
_ctx: &RequestContextAdaptor,
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?;
let guard = self.timeline.layers.read().await;
let layer_map = guard.layer_map();
let result = layer_map
.iter_historic_layers()
.filter(|l| {
overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
})
.map(OwnArc)
.collect();
Ok(result)
}
async fn get_keyspace(
&mut self,
key_range: &Range<Key>,
lsn: Lsn,
_ctx: &RequestContextAdaptor,
) -> anyhow::Result<Vec<Range<Key>>> {
if lsn == self.keyspace.0 {
Ok(pageserver_compaction::helpers::intersect_keyspace(
&self.keyspace.1.ranges,
key_range,
))
} else {
// The current compaction implementatin only ever requests the key space
// at the compaction end LSN.
anyhow::bail!("keyspace not available for requested lsn");
}
}
async fn downcast_delta_layer(
&self,
layer: &OwnArc<PersistentLayerDesc>,
) -> anyhow::Result<Option<ResidentDeltaLayer>> {
// this is a lot more complex than a simple downcast...
if layer.is_delta() {
let l = {
let guard = self.timeline.layers.read().await;
guard.get_from_desc(layer)
};
let result = l.download_and_keep_resident().await?;
Ok(Some(ResidentDeltaLayer(result)))
} else {
Ok(None)
}
}
async fn create_image(
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &RequestContextAdaptor,
) -> anyhow::Result<()> {
Ok(self.create_image_impl(lsn, key_range, ctx).await?)
}
async fn create_delta(
&mut self,
lsn_range: &Range<Lsn>,
key_range: &Range<Key>,
input_layers: &[ResidentDeltaLayer],
ctx: &RequestContextAdaptor,
) -> anyhow::Result<()> {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
let mut all_entries = Vec::new();
for dl in input_layers.iter() {
all_entries.extend(dl.load_keys(ctx).await?);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_entries.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
let mut writer = DeltaLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
)
.await?;
let mut dup_values = 0;
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let mut prev: Option<(Key, Lsn)> = None;
for &DeltaEntry {
key, lsn, ref val, ..
} in all_entries.iter()
{
if prev == Some((key, lsn)) {
// This is a duplicate. Skip it.
//
// It can happen if compaction is interrupted after writing some
// layers but not all, and we are compacting the range again.
// The calculations in the algorithm assume that there are no
// duplicates, so the math on targeted file size is likely off,
// and we will create smaller files than expected.
dup_values += 1;
continue;
}
let value = val.load(ctx).await?;
writer.put_value(key, lsn, value).await?;
prev = Some((key, lsn));
}
if dup_values > 0 {
warn!("delta layer created with {} duplicate values", dup_values);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
Err(anyhow::anyhow!(
"failpoint delta-layer-writer-fail-before-finish"
))
});
let new_delta_layer = writer
.finish(prev.unwrap().0.next(), &self.timeline)
.await?;
self.new_deltas.push(new_delta_layer);
Ok(())
}
async fn delete_layer(
&mut self,
layer: &OwnArc<PersistentLayerDesc>,
_ctx: &RequestContextAdaptor,
) -> anyhow::Result<()> {
self.layers_to_delete.push(layer.clone().0);
Ok(())
}
}
impl TimelineAdaptor {
async fn create_image_impl(
&mut self,
lsn: Lsn,
key_range: &Range<Key>,
ctx: &RequestContextAdaptor,
) -> Result<(), PageReconstructError> {
let timer = self.timeline.metrics.create_images_time_histo.start_timer();
let mut image_layer_writer = ImageLayerWriter::new(
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
key_range,
lsn,
)
.await?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(PageReconstructError::Other(anyhow::anyhow!(
"failpoint image-layer-writer-fail-before-finish"
)))
});
let keyspace_ranges = self.get_keyspace(key_range, lsn, ctx).await?;
for range in &keyspace_ranges {
let mut key = range.start;
while key < range.end {
let img = match self.timeline.get(key, lsn, ctx).await {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(err);
}
}
};
image_layer_writer.put_image(key, img).await?;
key = key.next();
}
}
let image_layer = image_layer_writer.finish(&self.timeline).await?;
self.new_images.push(image_layer);
timer.stop_and_record();
Ok(())
}
}
pub struct RequestContextAdaptor(pub RequestContext);
impl std::ops::Deref for RequestContextAdaptor {
type Target = RequestContext;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl CompactionRequestContext for RequestContextAdaptor {}
#[derive(Debug, Clone)]
pub struct OwnArc<T>(pub Arc<T>);
impl<T> Deref for OwnArc<T> {
type Target = <Arc<T> as Deref>::Target;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> AsRef<T> for OwnArc<T> {
fn as_ref(&self) -> &T {
self.0.as_ref()
}
}
impl CompactionLayer<Key> for OwnArc<PersistentLayerDesc> {
fn key_range(&self) -> &Range<Key> {
&self.key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.lsn_range
}
fn file_size(&self) -> u64 {
self.file_size
}
fn short_id(&self) -> std::string::String {
self.as_ref().short_id().to_string()
}
fn is_delta(&self) -> bool {
self.as_ref().is_delta()
}
}
impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
fn key_range(&self) -> &Range<Key> {
&self.layer_desc().key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.layer_desc().lsn_range
}
fn file_size(&self) -> u64 {
self.layer_desc().file_size
}
fn short_id(&self) -> std::string::String {
self.layer_desc().short_id().to_string()
}
fn is_delta(&self) -> bool {
true
}
}
use crate::tenant::timeline::DeltaEntry;
impl CompactionLayer<Key> for ResidentDeltaLayer {
fn key_range(&self) -> &Range<Key> {
&self.0.layer_desc().key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.0.layer_desc().lsn_range
}
fn file_size(&self) -> u64 {
self.0.layer_desc().file_size
}
fn short_id(&self) -> std::string::String {
self.0.layer_desc().short_id().to_string()
}
fn is_delta(&self) -> bool {
true
}
}
#[async_trait]
impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
type DeltaEntry<'a> = DeltaEntry<'a>;
async fn load_keys<'a>(
&self,
ctx: &RequestContextAdaptor,
) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
self.0.load_keys(ctx).await
}
}
impl CompactionLayer<Key> for ResidentImageLayer {
fn key_range(&self) -> &Range<Key> {
&self.0.layer_desc().key_range
}
fn lsn_range(&self) -> &Range<Lsn> {
&self.0.layer_desc().lsn_range
}
fn file_size(&self) -> u64 {
self.0.layer_desc().file_size
}
fn short_id(&self) -> std::string::String {
self.0.layer_desc().short_id().to_string()
}
fn is_delta(&self) -> bool {
false
}
}
impl CompactionImageLayer<TimelineAdaptor> for ResidentImageLayer {}

View File

@@ -160,6 +160,9 @@ def test_fully_custom_config(positive_env: NeonEnv):
"compaction_target_size": 1048576,
"checkpoint_distance": 10000,
"checkpoint_timeout": "13m",
"compaction_algorithm": {
"kind": "Tiered",
},
"eviction_policy": {
"kind": "LayerAccessThreshold",
"period": "20s",