mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
bottom-most-compaction: use in test_gc_feedback + fix bugs (#8103)
Adds manual compaction trigger; add gc compaction to test_gc_feedback Part of https://github.com/neondatabase/neon/issues/8002 ``` test_gc_feedback[debug-pg15].logical_size: 50 Mb test_gc_feedback[debug-pg15].physical_size: 2269 Mb test_gc_feedback[debug-pg15].physical/logical ratio: 44.5302 test_gc_feedback[debug-pg15].max_total_num_of_deltas: 7 test_gc_feedback[debug-pg15].max_num_of_deltas_above_image: 2 test_gc_feedback[debug-pg15].logical_size_after_bottom_most_compaction: 50 Mb test_gc_feedback[debug-pg15].physical_size_after_bottom_most_compaction: 287 Mb test_gc_feedback[debug-pg15].physical/logical ratio after bottom_most_compaction: 5.6312 test_gc_feedback[debug-pg15].max_total_num_of_deltas_after_bottom_most_compaction: 4 test_gc_feedback[debug-pg15].max_num_of_deltas_above_image_after_bottom_most_compaction: 1 ``` ## Summary of changes * Add the manual compaction trigger * Use in test_gc_feedback * Add a guard to avoid running it with retain_lsns * Fix: Do `schedule_compaction_update` after compaction * Fix: Supply deltas in the correct order to reconstruct value --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -1652,6 +1652,14 @@ async fn timeline_compact_handler(
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
|
||||
flags |= CompactFlags::ForceImageLayerCreation;
|
||||
}
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "enhanced_gc_bottom_most_compaction")? {
|
||||
if !cfg!(feature = "testing") {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"enhanced_gc_bottom_most_compaction is only available in testing mode"
|
||||
)));
|
||||
}
|
||||
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
|
||||
}
|
||||
let wait_until_uploaded =
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
|
||||
@@ -928,7 +928,6 @@ impl DeltaLayerInner {
|
||||
}
|
||||
|
||||
/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn load_key_values(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -486,7 +486,6 @@ impl ImageLayerInner {
|
||||
}
|
||||
|
||||
/// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn load_key_values(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -389,7 +389,6 @@ impl Layer {
|
||||
}
|
||||
|
||||
/// Get all key/values in the layer. Should be replaced with an iterator-based API in the future.
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn load_key_values(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
@@ -1774,7 +1773,6 @@ impl DownloadedLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn load_key_values(
|
||||
&self,
|
||||
owner: &Arc<LayerInner>,
|
||||
|
||||
@@ -686,6 +686,7 @@ pub enum GetLogicalSizePriority {
|
||||
pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
ForceImageLayerCreation,
|
||||
EnhancedGcBottomMostCompaction,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Timeline {
|
||||
@@ -1096,7 +1097,6 @@ impl Timeline {
|
||||
/// scan iterator interface. We could optimize this interface later to avoid some checks in the vectored
|
||||
/// get path to maintain and split the probing and to-be-probe keyspace. We also need to ensure that
|
||||
/// the scan operation will not cause OOM in the future.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn scan(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
|
||||
@@ -47,10 +47,14 @@ impl Timeline {
|
||||
/// TODO: cancellation
|
||||
pub(crate) async fn compact_legacy(
|
||||
self: &Arc<Self>,
|
||||
_cancel: &CancellationToken,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
|
||||
return self.compact_with_gc(cancel, ctx).await;
|
||||
}
|
||||
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
@@ -959,15 +963,20 @@ impl Timeline {
|
||||
/// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
|
||||
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
|
||||
/// and create delta layers with all deltas >= gc horizon.
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
_cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
use crate::tenant::storage_layer::ValueReconstructState;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use crate::tenant::storage_layer::ValueReconstructState;
|
||||
info!("running enhanced gc bottom-most compaction");
|
||||
|
||||
scopeguard::defer! {
|
||||
info!("done enhanced gc bottom-most compaction");
|
||||
};
|
||||
|
||||
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
|
||||
// The layer selection has the following properties:
|
||||
// 1. If a layer is in the selection, all layers below it are in the selection.
|
||||
@@ -976,6 +985,11 @@ impl Timeline {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"enhanced legacy compaction currently does not support retain_lsns (branches)"
|
||||
)));
|
||||
}
|
||||
let gc_cutoff = Lsn::min(gc_info.cutoffs.horizon, gc_info.cutoffs.pitr);
|
||||
let mut selected_layers = Vec::new();
|
||||
// TODO: consider retain_lsns
|
||||
@@ -987,6 +1001,11 @@ impl Timeline {
|
||||
}
|
||||
(selected_layers, gc_cutoff)
|
||||
};
|
||||
info!(
|
||||
"picked {} layers for compaction with gc_cutoff={}",
|
||||
layer_selection.len(),
|
||||
gc_cutoff
|
||||
);
|
||||
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
|
||||
// Also, collect the layer information to decide when to split the new delta layers.
|
||||
let mut all_key_values = Vec::new();
|
||||
@@ -1064,10 +1083,8 @@ impl Timeline {
|
||||
} else if *lsn <= horizon {
|
||||
match val {
|
||||
crate::repository::Value::Image(image) => {
|
||||
if lsn <= &horizon {
|
||||
base_image = Some((*lsn, image.clone()));
|
||||
break;
|
||||
}
|
||||
base_image = Some((*lsn, image.clone()));
|
||||
break;
|
||||
}
|
||||
crate::repository::Value::WalRecord(wal) => {
|
||||
delta_above_base_image.push((*lsn, wal.clone()));
|
||||
@@ -1075,7 +1092,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
delta_above_base_image.reverse();
|
||||
// do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
|
||||
keys_above_horizon.reverse();
|
||||
let state = ValueReconstructState {
|
||||
img: base_image,
|
||||
@@ -1200,6 +1217,11 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
info!(
|
||||
"produced {} delta layers and {} image layers",
|
||||
delta_layers.len(),
|
||||
1
|
||||
);
|
||||
let mut compact_to = Vec::new();
|
||||
compact_to.extend(delta_layers);
|
||||
compact_to.push(image_layer);
|
||||
@@ -1208,6 +1230,9 @@ impl Timeline {
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
|
||||
};
|
||||
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&layer_selection, &compact_to)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -227,7 +227,6 @@ impl LayerManager {
|
||||
}
|
||||
|
||||
/// Called when a GC-compaction is completed.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn finish_gc_compaction(
|
||||
&mut self,
|
||||
compact_from: &[Layer],
|
||||
|
||||
@@ -573,6 +573,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
force_repartition=False,
|
||||
force_image_layer_creation=False,
|
||||
wait_until_uploaded=False,
|
||||
enhanced_gc_bottom_most_compaction=False,
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
@@ -582,6 +583,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
query["force_image_layer_creation"] = "true"
|
||||
if wait_until_uploaded:
|
||||
query["wait_until_uploaded"] = "true"
|
||||
if enhanced_gc_bottom_most_compaction:
|
||||
query["enhanced_gc_bottom_most_compaction"] = "true"
|
||||
|
||||
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
|
||||
res = self.put(
|
||||
|
||||
@@ -33,7 +33,7 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "10 s",
|
||||
"pitr_interval": "60 s",
|
||||
# "compaction_threshold": "3",
|
||||
# "image_creation_threshold": "2",
|
||||
}
|
||||
@@ -99,6 +99,52 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
client.timeline_compact(tenant_id, timeline_id, enhanced_gc_bottom_most_compaction=True)
|
||||
tline_detail = client.timeline_detail(tenant_id, timeline_id)
|
||||
logical_size = tline_detail["current_logical_size"]
|
||||
physical_size = tline_detail["current_physical_size"]
|
||||
|
||||
max_num_of_deltas_above_image = 0
|
||||
max_total_num_of_deltas = 0
|
||||
for key_range in client.perf_info(tenant_id, timeline_id):
|
||||
max_total_num_of_deltas = max(max_total_num_of_deltas, key_range["total_num_of_deltas"])
|
||||
max_num_of_deltas_above_image = max(
|
||||
max_num_of_deltas_above_image, key_range["num_of_deltas_above_image"]
|
||||
)
|
||||
zenbenchmark.record(
|
||||
"logical_size_after_bottom_most_compaction",
|
||||
logical_size // MB,
|
||||
"Mb",
|
||||
MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
zenbenchmark.record(
|
||||
"physical_size_after_bottom_most_compaction",
|
||||
physical_size // MB,
|
||||
"Mb",
|
||||
MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
zenbenchmark.record(
|
||||
"physical/logical ratio after bottom_most_compaction",
|
||||
physical_size / logical_size,
|
||||
"",
|
||||
MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
zenbenchmark.record(
|
||||
"max_total_num_of_deltas_after_bottom_most_compaction",
|
||||
max_total_num_of_deltas,
|
||||
"",
|
||||
MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
zenbenchmark.record(
|
||||
"max_num_of_deltas_above_image_after_bottom_most_compaction",
|
||||
max_num_of_deltas_above_image,
|
||||
"",
|
||||
MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT * FROM t") # ensure data is not corrupted
|
||||
|
||||
layer_map_path = env.repo_dir / "layer-map.json"
|
||||
log.info(f"Writing layer map to {layer_map_path}")
|
||||
with layer_map_path.open("w") as f:
|
||||
|
||||
Reference in New Issue
Block a user