feat(pageserver): support dry-run for gc-compaction, add statistics (#8557)

Add dry-run mode that does not produce any image layer + delta layer. I
will use this code to do some experiments and see how much space we can
reclaim for tenants on staging. Part of
https://github.com/neondatabase/neon/issues/8002

* Add dry-run mode that runs the full compaction process without
updating the layer map. (We never call finish on the writers and the
files will be removed before exiting the function).
* Add compaction statistics and print them at the end of compaction.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-08-06 10:07:48 +08:00
committed by John Spray
parent 3b4b9c1d0b
commit 83afea3edb
4 changed files with 204 additions and 12 deletions

View File

@@ -6899,7 +6899,10 @@ mod tests {
}
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
for (idx, expected) in expected_result.iter().enumerate() {
assert_eq!(
@@ -6993,7 +6996,10 @@ mod tests {
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.space = Lsn(0x40);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
Ok(())
}
@@ -7327,7 +7333,10 @@ mod tests {
}
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
for idx in 0..10 {
assert_eq!(
@@ -7353,7 +7362,10 @@ mod tests {
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.space = Lsn(0x40);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
Ok(())
}
@@ -7898,11 +7910,28 @@ mod tests {
verify_result().await;
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
let mut dryrun_flags = EnumSet::new();
dryrun_flags.insert(CompactFlags::DryRun);
tline
.compact_with_gc(&cancel, dryrun_flags, &ctx)
.await
.unwrap();
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
// cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests.
verify_result().await;
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;
// compact again
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;
// increase GC horizon and compact again
@@ -7912,11 +7941,17 @@ mod tests {
guard.cutoffs.time = Lsn(0x38);
guard.cutoffs.space = Lsn(0x38);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result
// not increasing the GC horizon and compact again
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;
Ok(())
@@ -8097,7 +8132,10 @@ mod tests {
verify_result().await;
let cancel = CancellationToken::new();
branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap();
branch_tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;

View File

@@ -753,6 +753,10 @@ struct ImageLayerWriterInner {
}
impl ImageLayerWriterInner {
fn size(&self) -> u64 {
self.tree.borrow_writer().size() + self.blob_writer.size()
}
///
/// Start building a new image layer.
///
@@ -1044,6 +1048,10 @@ impl ImageLayerWriter {
.finish(timeline, ctx, Some(end_key))
.await
}
pub(crate) fn size(&self) -> u64 {
self.inner.as_ref().unwrap().size()
}
}
impl Drop for ImageLayerWriter {

View File

@@ -704,6 +704,7 @@ pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
EnhancedGcBottomMostCompaction,
DryRun,
}
impl std::fmt::Debug for Timeline {

View File

@@ -19,8 +19,10 @@ use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::key::KEY_SIZE;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;
@@ -41,6 +43,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::keyspace::KeySpace;
use crate::repository::{Key, Value};
use crate::walrecord::NeonWalRecord;
use utils::lsn::Lsn;
@@ -73,6 +76,7 @@ impl KeyHistoryRetention {
key: Key,
delta_writer: &mut Vec<(Key, Lsn, Value)>,
mut image_writer: Option<&mut ImageLayerWriter>,
stat: &mut CompactionStatistics,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
@@ -82,6 +86,7 @@ impl KeyHistoryRetention {
let Value::Image(img) = &logs[0].1 else {
unreachable!()
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
@@ -89,24 +94,111 @@ impl KeyHistoryRetention {
}
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.push((key, lsn, val));
}
}
first_batch = false;
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.push((key, lsn, val));
}
}
}
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
for (lsn, val) in above_horizon_logs {
stat.produce_key(&val);
delta_writer.push((key, lsn, val));
}
Ok(())
}
}
#[derive(Debug, Serialize, Default)]
struct CompactionStatisticsNumSize {
num: u64,
size: u64,
}
#[derive(Debug, Serialize, Default)]
pub struct CompactionStatistics {
delta_layer_visited: CompactionStatisticsNumSize,
image_layer_visited: CompactionStatisticsNumSize,
delta_layer_produced: CompactionStatisticsNumSize,
image_layer_produced: CompactionStatisticsNumSize,
num_delta_layer_discarded: usize,
num_image_layer_discarded: usize,
num_unique_keys_visited: usize,
wal_keys_visited: CompactionStatisticsNumSize,
image_keys_visited: CompactionStatisticsNumSize,
wal_produced: CompactionStatisticsNumSize,
image_produced: CompactionStatisticsNumSize,
}
impl CompactionStatistics {
fn estimated_size_of_value(val: &Value) -> usize {
match val {
Value::Image(img) => img.len(),
Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
_ => std::mem::size_of::<NeonWalRecord>(),
}
}
fn estimated_size_of_key() -> usize {
KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
}
fn visit_delta_layer(&mut self, size: u64) {
self.delta_layer_visited.num += 1;
self.delta_layer_visited.size += size;
}
fn visit_image_layer(&mut self, size: u64) {
self.image_layer_visited.num += 1;
self.image_layer_visited.size += size;
}
fn on_unique_key_visited(&mut self) {
self.num_unique_keys_visited += 1;
}
fn visit_wal_key(&mut self, val: &Value) {
self.wal_keys_visited.num += 1;
self.wal_keys_visited.size +=
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
}
fn visit_image_key(&mut self, val: &Value) {
self.image_keys_visited.num += 1;
self.image_keys_visited.size +=
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
}
fn produce_key(&mut self, val: &Value) {
match val {
Value::Image(img) => self.produce_image_key(img),
Value::WalRecord(_) => self.produce_wal_key(val),
}
}
fn produce_wal_key(&mut self, val: &Value) {
self.wal_produced.num += 1;
self.wal_produced.size +=
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
}
fn produce_image_key(&mut self, val: &Bytes) {
self.image_produced.num += 1;
self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
}
fn discard_delta_layer(&mut self) {
self.num_delta_layer_discarded += 1;
}
fn discard_image_layer(&mut self) {
self.num_image_layer_discarded += 1;
}
fn produce_delta_layer(&mut self, size: u64) {
self.delta_layer_produced.num += 1;
self.delta_layer_produced.size += size;
}
fn produce_image_layer(&mut self, size: u64) {
self.image_layer_produced.num += 1;
self.image_layer_produced.size += size;
}
}
impl Timeline {
/// TODO: cancellation
///
@@ -118,12 +210,18 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
self.compact_with_gc(cancel, ctx)
self.compact_with_gc(cancel, flags, ctx)
.await
.map_err(CompactionError::Other)?;
return Ok(false);
}
if flags.contains(CompactFlags::DryRun) {
return Err(CompactionError::Other(anyhow!(
"dry-run mode is not supported for legacy compaction for now"
)));
}
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
@@ -1641,6 +1739,7 @@ impl Timeline {
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use std::collections::BTreeSet;
@@ -1664,12 +1763,16 @@ impl Timeline {
)
.await?;
info!("running enhanced gc bottom-most compaction");
let dry_run = flags.contains(CompactFlags::DryRun);
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
scopeguard::defer! {
info!("done enhanced gc bottom-most compaction");
};
let mut stat = CompactionStatistics::default();
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
// The layer selection has the following properties:
// 1. If a layer is in the selection, all layers below it are in the selection.
@@ -1740,6 +1843,9 @@ impl Timeline {
let key_range = desc.get_key_range();
delta_split_points.insert(key_range.start);
delta_split_points.insert(key_range.end);
stat.visit_delta_layer(desc.file_size());
} else {
stat.visit_image_layer(desc.file_size());
}
}
let mut delta_layers = Vec::new();
@@ -1775,6 +1881,8 @@ impl Timeline {
tline: &Arc<Timeline>,
lowest_retain_lsn: Lsn,
ctx: &RequestContext,
stats: &mut CompactionStatistics,
dry_run: bool,
last_batch: bool,
) -> anyhow::Result<Option<FlushDeltaResult>> {
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
@@ -1831,6 +1939,7 @@ impl Timeline {
let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
drop(guard);
if layer_generation == tline.generation {
stats.discard_delta_layer();
// TODO: depending on whether we design this compaction process to run along with
// other compactions, there could be layer map modifications after we drop the
// layer guard, and in case it creates duplicated layer key, we will still error
@@ -1857,6 +1966,10 @@ impl Timeline {
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
stats.produce_delta_layer(delta_layer_writer.size());
if dry_run {
return Ok(None);
}
let delta_layer = delta_layer_writer
.finish(delta_key.key_range.end, tline, ctx)
.await?;
@@ -1951,6 +2064,13 @@ impl Timeline {
let mut current_delta_split_point = 0;
let mut delta_layers = Vec::new();
while let Some((key, lsn, val)) = merge_iter.next().await? {
if cancel.is_cancelled() {
return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
}
match val {
Value::Image(_) => stat.visit_image_key(&val),
Value::WalRecord(_) => stat.visit_wal_key(&val),
}
if last_key.is_none() || last_key.as_ref() == Some(&key) {
if last_key.is_none() {
last_key = Some(key);
@@ -1958,6 +2078,7 @@ impl Timeline {
accumulated_values.push((key, lsn, val));
} else {
let last_key = last_key.as_mut().unwrap();
stat.on_unique_key_visited();
let retention = self
.generate_key_retention(
*last_key,
@@ -1974,6 +2095,7 @@ impl Timeline {
*last_key,
&mut delta_values,
image_layer_writer.as_mut(),
&mut stat,
ctx,
)
.await?;
@@ -1986,6 +2108,8 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
&mut stat,
dry_run,
false,
)
.await?,
@@ -1998,6 +2122,7 @@ impl Timeline {
let last_key = last_key.expect("no keys produced during compaction");
// TODO: move this part to the loop body
stat.on_unique_key_visited();
let retention = self
.generate_key_retention(
last_key,
@@ -2014,6 +2139,7 @@ impl Timeline {
last_key,
&mut delta_values,
image_layer_writer.as_mut(),
&mut stat,
ctx,
)
.await?;
@@ -2026,6 +2152,8 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
&mut stat,
dry_run,
true,
)
.await?,
@@ -2033,12 +2161,28 @@ impl Timeline {
assert!(delta_values.is_empty(), "unprocessed keys");
let image_layer = if discard_image_layer {
stat.discard_image_layer();
None
} else if let Some(writer) = image_layer_writer {
Some(writer.finish(self, ctx).await?)
stat.produce_image_layer(writer.size());
if !dry_run {
Some(writer.finish(self, ctx).await?)
} else {
None
}
} else {
None
};
info!(
"gc-compaction statistics: {}",
serde_json::to_string(&stat)?
);
if dry_run {
return Ok(());
}
info!(
"produced {} delta layers and {} image layers",
delta_layers.len(),
@@ -2062,6 +2206,7 @@ impl Timeline {
let mut layer_selection = layer_selection;
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
compact_to.extend(image_layer);
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;