Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Chi Z
bcea411009 feat(pageserver): l0-l0 compaction
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-02-13 21:51:01 +01:00
3 changed files with 29 additions and 7 deletions

View File

@@ -95,6 +95,8 @@ pub struct LayerMap {
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
///
/// NB: make sure to notify `watch_l0_deltas` on changes.
/// NB: this is not sorted by LSN, but by the order of insertion; always use the historic layer info to
/// retrieve L0 layers in order.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Notifies about L0 delta layer changes, sending the current number of L0 layers.

View File

@@ -5433,7 +5433,8 @@ impl Timeline {
// because we have not implemented L0 => L0 compaction.
duplicated_layers.insert(l.layer_desc().key());
} else if LayerMap::is_l0(&l.layer_desc().key_range, l.layer_desc().is_delta) {
return Err(CompactionError::Other(anyhow::anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
// This is not an error any more because we allow L0-L0 compaction.
// return Err(CompactionError::Other(anyhow::anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
} else {
insert_layers.push(l.clone());
}

View File

@@ -1199,11 +1199,13 @@ impl Timeline {
//
// In general, compaction_threshold should be <= compaction_upper_limit, but in case that
// the constraint is not respected, we use the larger of the two.
let delta_target_size =
std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
let delta_size_limit = std::cmp::max(
self.get_compaction_upper_limit(),
self.get_compaction_threshold(),
) as u64
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
* delta_target_size;
let mut fully_compacted = true;
@@ -1241,12 +1243,21 @@ impl Timeline {
end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
};
// If the total size of the deltas to compact is less than the target size, we produce the newly-generated layers
// as L0 files. Otherwise, we produce L1 files.
let l0_to_l0_compaction = deltas_to_compact
.iter()
.map(|l| l.metadata().file_size)
.sum::<u64>()
<= delta_target_size;
info!(
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total), l0_to_l0_compaction={}",
lsn_range.start,
lsn_range.end,
deltas_to_compact.len(),
level0_deltas.len()
level0_deltas.len(),
l0_to_l0_compaction
);
for l in deltas_to_compact.iter() {
@@ -1502,7 +1513,8 @@ impl Timeline {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
if writer.is_some() && !l0_to_l0_compaction {
// L0-L0 compaction ONLY produces one layer.
let written_size = writer.as_mut().unwrap().size();
let contains_hole =
next_hole < holes.len() && key >= holes[next_hole].key_range.end;
@@ -1552,7 +1564,7 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
if l0_to_l0_compaction { Key::MIN } else { key },
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
@@ -1591,7 +1603,14 @@ impl Timeline {
}
if let Some(writer) = writer {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.finish(
if l0_to_l0_compaction {
Key::MAX
} else {
prev_key.unwrap().next()
},
ctx,
)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)