mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
feat(pageserver): support partial gc-compaction for lowest retain lsn (#9134)
part of https://github.com/neondatabase/neon/issues/8921, https://github.com/neondatabase/neon/issues/9114 ## Summary of changes We start the partial compaction implementation with the image layer partial generation. The partial compaction API now takes a key range. We will only generate images for that key range for now, and remove layers fully included in the key range after compaction. --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -5138,6 +5138,7 @@ mod tests {
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
use pageserver_api::value::Value;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
use rand::{thread_rng, Rng};
|
||||
use storage_layer::PersistentLayerKey;
|
||||
use tests::storage_layer::ValuesReconstructState;
|
||||
@@ -7660,23 +7661,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Check if old layers are removed / new layers have the expected LSN
|
||||
let mut all_layers = tline.inspect_historic_layers().await.unwrap();
|
||||
all_layers.sort_by(|k1, k2| {
|
||||
(
|
||||
k1.is_delta,
|
||||
k1.key_range.start,
|
||||
k1.key_range.end,
|
||||
k1.lsn_range.start,
|
||||
k1.lsn_range.end,
|
||||
)
|
||||
.cmp(&(
|
||||
k2.is_delta,
|
||||
k2.key_range.start,
|
||||
k2.key_range.end,
|
||||
k2.lsn_range.start,
|
||||
k2.lsn_range.end,
|
||||
))
|
||||
});
|
||||
let all_layers = inspect_and_sort(&tline, None).await;
|
||||
assert_eq!(
|
||||
all_layers,
|
||||
vec![
|
||||
@@ -9220,4 +9205,249 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn inspect_and_sort(
|
||||
tline: &Arc<Timeline>,
|
||||
filter: Option<std::ops::Range<Key>>,
|
||||
) -> Vec<PersistentLayerKey> {
|
||||
let mut all_layers = tline.inspect_historic_layers().await.unwrap();
|
||||
if let Some(filter) = filter {
|
||||
all_layers.retain(|layer| overlaps_with(&layer.key_range, &filter));
|
||||
}
|
||||
all_layers.sort_by(|k1, k2| {
|
||||
(
|
||||
k1.is_delta,
|
||||
k1.key_range.start,
|
||||
k1.key_range.end,
|
||||
k1.lsn_range.start,
|
||||
k1.lsn_range.end,
|
||||
)
|
||||
.cmp(&(
|
||||
k2.is_delta,
|
||||
k2.key_range.start,
|
||||
k2.key_range.end,
|
||||
k2.lsn_range.start,
|
||||
k2.lsn_range.end,
|
||||
))
|
||||
});
|
||||
all_layers
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_simple_partial_bottom_most_compaction() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_partial_bottom_most_compaction").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
// img layer at 0x10
|
||||
let img_layer = (0..10)
|
||||
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
|
||||
.collect_vec();
|
||||
|
||||
let delta1 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::Image(Bytes::from("value 1@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::Image(Bytes::from("value 2@0x30")),
|
||||
),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x40),
|
||||
Value::Image(Bytes::from("value 3@0x40")),
|
||||
),
|
||||
];
|
||||
let delta2 = vec![
|
||||
(
|
||||
get_key(5),
|
||||
Lsn(0x20),
|
||||
Value::Image(Bytes::from("value 5@0x20")),
|
||||
),
|
||||
(
|
||||
get_key(6),
|
||||
Lsn(0x20),
|
||||
Value::Image(Bytes::from("value 6@0x20")),
|
||||
),
|
||||
];
|
||||
let delta3 = vec![
|
||||
(
|
||||
get_key(8),
|
||||
Lsn(0x48),
|
||||
Value::Image(Bytes::from("value 8@0x48")),
|
||||
),
|
||||
(
|
||||
get_key(9),
|
||||
Lsn(0x48),
|
||||
Value::Image(Bytes::from("value 9@0x48")),
|
||||
),
|
||||
];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
|
||||
], // delta layers
|
||||
vec![(Lsn(0x10), img_layer)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
|
||||
{
|
||||
// Update GC info
|
||||
let mut guard = tline.gc_info.write().unwrap();
|
||||
*guard = GcInfo {
|
||||
retain_lsns: vec![(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No)],
|
||||
cutoffs: GcCutoffs {
|
||||
time: Lsn(0x30),
|
||||
space: Lsn(0x30),
|
||||
},
|
||||
leases: Default::default(),
|
||||
within_ancestor_pitr: false,
|
||||
};
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Do a partial compaction on key range 0..4, we should generate a image layer; no other layers
|
||||
// can be removed because they might be used for other key ranges.
|
||||
tline
|
||||
.partial_compact_with_gc(Some(get_key(0)..get_key(4)), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
assert_eq!(
|
||||
all_layers,
|
||||
vec![
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(4),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x21),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(10),
|
||||
lsn_range: Lsn(0x10)..Lsn(0x11),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(1)..get_key(4),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(5)..get_key(7),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(8)..get_key(10),
|
||||
lsn_range: Lsn(0x48)..Lsn(0x50),
|
||||
is_delta: true
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
// Do a partial compaction on key range 4..10
|
||||
tline
|
||||
.partial_compact_with_gc(Some(get_key(4)..get_key(10)), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
assert_eq!(
|
||||
all_layers,
|
||||
vec![
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(4),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x21),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
// if (in the future) GC kicks in, this layer will be removed
|
||||
key_range: get_key(0)..get_key(10),
|
||||
lsn_range: Lsn(0x10)..Lsn(0x11),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(4)..get_key(10),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x21),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(1)..get_key(4),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(5)..get_key(7),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(8)..get_key(10),
|
||||
lsn_range: Lsn(0x48)..Lsn(0x50),
|
||||
is_delta: true
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
|
||||
tline
|
||||
.partial_compact_with_gc(Some(get_key(0)..get_key(10)), &cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
|
||||
assert_eq!(
|
||||
all_layers,
|
||||
vec![
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(4),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x21),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(10),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x21),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(4)..get_key(10),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x21),
|
||||
is_delta: false
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(1)..get_key(4),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(5)..get_key(7),
|
||||
lsn_range: Lsn(0x20)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(8)..get_key(10),
|
||||
lsn_range: Lsn(0x48)..Lsn(0x50),
|
||||
is_delta: true
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1716,20 +1716,32 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// An experimental compaction building block that combines compaction with garbage collection.
|
||||
///
|
||||
/// The current implementation picks all delta + image layers that are below or intersecting with
|
||||
/// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
|
||||
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
|
||||
/// and create delta layers with all deltas >= gc horizon.
|
||||
pub(crate) async fn compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::collections::BTreeSet;
|
||||
self.partial_compact_with_gc(None, cancel, flags, ctx).await
|
||||
}
|
||||
|
||||
/// An experimental compaction building block that combines compaction with garbage collection.
|
||||
///
|
||||
/// The current implementation picks all delta + image layers that are below or intersecting with
|
||||
/// the GC horizon without considering retain_lsns. Then, it does a full compaction over all these delta
|
||||
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
|
||||
/// and create delta layers with all deltas >= gc horizon.
|
||||
///
|
||||
/// If `key_range`, it will only compact the keys within the range, aka partial compaction. This functionality
|
||||
/// is not complete yet, and if it is set, only image layers will be generated.
|
||||
///
|
||||
pub(crate) async fn partial_compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
compaction_key_range: Option<Range<Key>>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||
@@ -1750,8 +1762,13 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
let dry_run = flags.contains(CompactFlags::DryRun);
|
||||
let partial_compaction = compaction_key_range.is_some();
|
||||
|
||||
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
|
||||
if let Some(ref compaction_key_range) = compaction_key_range {
|
||||
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compaction_key_range={}..{}", compaction_key_range.start, compaction_key_range.end);
|
||||
} else {
|
||||
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
|
||||
}
|
||||
|
||||
scopeguard::defer! {
|
||||
info!("done enhanced gc bottom-most compaction");
|
||||
@@ -1763,7 +1780,7 @@ impl Timeline {
|
||||
// The layer selection has the following properties:
|
||||
// 1. If a layer is in the selection, all layers below it are in the selection.
|
||||
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
|
||||
let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
|
||||
let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = if !partial_compaction {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
@@ -1779,7 +1796,7 @@ impl Timeline {
|
||||
retain_lsns_below_horizon.push(*lsn);
|
||||
}
|
||||
}
|
||||
let mut selected_layers = Vec::new();
|
||||
let mut selected_layers: Vec<Layer> = Vec::new();
|
||||
drop(gc_info);
|
||||
// Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers.
|
||||
let Some(max_layer_lsn) = layers
|
||||
@@ -1804,8 +1821,52 @@ impl Timeline {
|
||||
}
|
||||
retain_lsns_below_horizon.sort();
|
||||
(selected_layers, gc_cutoff, retain_lsns_below_horizon)
|
||||
} else {
|
||||
// In case of partial compaction, we currently only support generating image layers, and therefore,
|
||||
// we pick all layers that are below the lowest retain_lsn and does not intersect with any of the layers.
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let mut min_lsn = gc_info.cutoffs.select_min();
|
||||
for (lsn, _, _) in &gc_info.retain_lsns {
|
||||
if lsn < &min_lsn {
|
||||
min_lsn = *lsn;
|
||||
}
|
||||
}
|
||||
for lsn in gc_info.leases.keys() {
|
||||
if lsn < &min_lsn {
|
||||
min_lsn = *lsn;
|
||||
}
|
||||
}
|
||||
let mut selected_layers = Vec::new();
|
||||
drop(gc_info);
|
||||
// |-------| |-------| |-------|
|
||||
// | Delta | | Delta | | Delta | -- min_lsn could be intersecting with the layers
|
||||
// |-------| |-------| |-------| <- we want to pick all the layers below min_lsn, so that
|
||||
// | Delta | | Delta | | Delta | ...we can remove them after compaction
|
||||
// |-------| |-------| |-------|
|
||||
// Pick all the layers intersect or below the min_lsn, get the largest LSN in the selected layers.
|
||||
let Some(compaction_key_range) = compaction_key_range.as_ref() else {
|
||||
unreachable!()
|
||||
};
|
||||
for desc in layers.iter_historic_layers() {
|
||||
if desc.get_lsn_range().end <= min_lsn
|
||||
&& overlaps_with(&desc.key_range, compaction_key_range)
|
||||
{
|
||||
selected_layers.push(guard.get_from_desc(&desc));
|
||||
}
|
||||
}
|
||||
if selected_layers.is_empty() {
|
||||
info!("no layers to compact with gc");
|
||||
return Ok(());
|
||||
}
|
||||
(selected_layers, min_lsn, Vec::new())
|
||||
};
|
||||
let lowest_retain_lsn = if self.ancestor_timeline.is_some() {
|
||||
if partial_compaction {
|
||||
warn!("partial compaction cannot run on child branches (for now)");
|
||||
return Ok(());
|
||||
}
|
||||
Lsn(self.ancestor_lsn.0 + 1)
|
||||
} else {
|
||||
let res = retain_lsns_below_horizon
|
||||
@@ -1833,23 +1894,18 @@ impl Timeline {
|
||||
|
||||
self.check_compaction_space(&layer_selection).await?;
|
||||
|
||||
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
// Generate statistics for the compaction
|
||||
for layer in &layer_selection {
|
||||
let desc = layer.layer_desc();
|
||||
if desc.is_delta() {
|
||||
// ignore single-key layer files
|
||||
if desc.key_range.start.next() != desc.key_range.end {
|
||||
let lsn_range = &desc.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
}
|
||||
stat.visit_delta_layer(desc.file_size());
|
||||
} else {
|
||||
stat.visit_image_layer(desc.file_size());
|
||||
}
|
||||
}
|
||||
|
||||
// Step 1: construct a k-merge iterator over all layers.
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
|
||||
.iter()
|
||||
.map(|layer| layer.layer_desc().layer_name())
|
||||
@@ -1900,7 +1956,10 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
compaction_key_range
|
||||
.as_ref()
|
||||
.map(|x| x.start)
|
||||
.unwrap_or(Key::MIN),
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
@@ -1961,55 +2020,71 @@ impl Timeline {
|
||||
} else {
|
||||
let last_key = last_key.as_mut().unwrap();
|
||||
stat.on_unique_key_visited();
|
||||
let retention = self
|
||||
.generate_key_retention(
|
||||
*last_key,
|
||||
&accumulated_values,
|
||||
gc_cutoff,
|
||||
&retain_lsns_below_horizon,
|
||||
COMPACTION_DELTA_THRESHOLD,
|
||||
get_ancestor_image(self, *last_key, ctx).await?,
|
||||
)
|
||||
.await?;
|
||||
// Put the image into the image layer. Currently we have a single big layer for the compaction.
|
||||
retention
|
||||
.pipe_to(
|
||||
*last_key,
|
||||
&mut delta_layer_writer,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range {
|
||||
!compaction_key_range.contains(last_key)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if !skip_adding_key {
|
||||
let retention = self
|
||||
.generate_key_retention(
|
||||
*last_key,
|
||||
&accumulated_values,
|
||||
gc_cutoff,
|
||||
&retain_lsns_below_horizon,
|
||||
COMPACTION_DELTA_THRESHOLD,
|
||||
get_ancestor_image(self, *last_key, ctx).await?,
|
||||
)
|
||||
.await?;
|
||||
// Put the image into the image layer. Currently we have a single big layer for the compaction.
|
||||
retention
|
||||
.pipe_to(
|
||||
*last_key,
|
||||
&mut delta_layer_writer,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
accumulated_values.clear();
|
||||
*last_key = key;
|
||||
accumulated_values.push((key, lsn, val));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move the below part to the loop body
|
||||
let last_key = last_key.expect("no keys produced during compaction");
|
||||
// TODO: move this part to the loop body
|
||||
stat.on_unique_key_visited();
|
||||
let retention = self
|
||||
.generate_key_retention(
|
||||
last_key,
|
||||
&accumulated_values,
|
||||
gc_cutoff,
|
||||
&retain_lsns_below_horizon,
|
||||
COMPACTION_DELTA_THRESHOLD,
|
||||
get_ancestor_image(self, last_key, ctx).await?,
|
||||
)
|
||||
.await?;
|
||||
// Put the image into the image layer. Currently we have a single big layer for the compaction.
|
||||
retention
|
||||
.pipe_to(
|
||||
last_key,
|
||||
&mut delta_layer_writer,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let skip_adding_key = if let Some(ref compaction_key_range) = compaction_key_range {
|
||||
!compaction_key_range.contains(&last_key)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if !skip_adding_key {
|
||||
let retention = self
|
||||
.generate_key_retention(
|
||||
last_key,
|
||||
&accumulated_values,
|
||||
gc_cutoff,
|
||||
&retain_lsns_below_horizon,
|
||||
COMPACTION_DELTA_THRESHOLD,
|
||||
get_ancestor_image(self, last_key, ctx).await?,
|
||||
)
|
||||
.await?;
|
||||
// Put the image into the image layer. Currently we have a single big layer for the compaction.
|
||||
retention
|
||||
.pipe_to(
|
||||
last_key,
|
||||
&mut delta_layer_writer,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
// end: move the above part to the loop body
|
||||
|
||||
let discard = |key: &PersistentLayerKey| {
|
||||
let key = key.clone();
|
||||
@@ -2018,8 +2093,12 @@ impl Timeline {
|
||||
|
||||
let produced_image_layers = if let Some(writer) = image_layer_writer {
|
||||
if !dry_run {
|
||||
let end_key = compaction_key_range
|
||||
.as_ref()
|
||||
.map(|x| x.end)
|
||||
.unwrap_or(Key::MAX);
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
|
||||
.finish_with_discard_fn(self, ctx, end_key, discard)
|
||||
.await?
|
||||
} else {
|
||||
drop(writer);
|
||||
@@ -2038,6 +2117,10 @@ impl Timeline {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
if partial_compaction && !produced_delta_layers.is_empty() {
|
||||
bail!("implementation error: partial compaction should not be producing delta layers (for now)");
|
||||
}
|
||||
|
||||
let mut compact_to = Vec::new();
|
||||
let mut keep_layers = HashSet::new();
|
||||
let produced_delta_layers_len = produced_delta_layers.len();
|
||||
@@ -2068,6 +2151,28 @@ impl Timeline {
|
||||
}
|
||||
let mut layer_selection = layer_selection;
|
||||
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
|
||||
if let Some(ref compaction_key_range) = compaction_key_range {
|
||||
// Partial compaction might select more data than it processes, e.g., if
|
||||
// the compaction_key_range only partially overlaps:
|
||||
//
|
||||
// [---compaction_key_range---]
|
||||
// [---A----][----B----][----C----][----D----]
|
||||
//
|
||||
// A,B,C,D are all in the `layer_selection`. The created image layers contain
|
||||
// whatever is needed from B, C, and from `----]` of A, and from `[--` of D.
|
||||
//
|
||||
// In contrast, `[--A-` and `--D----]` have not been processed, so, we must
|
||||
// keep that data.
|
||||
//
|
||||
// The solution for now is to keep A and D completely.
|
||||
// (layer_selection is what we'll remove from the layer map, so,
|
||||
// retain what is _not_ fully covered by compaction_key_range).
|
||||
layer_selection.retain(|x| {
|
||||
let key_range = &x.layer_desc().key_range;
|
||||
key_range.start >= compaction_key_range.start
|
||||
&& key_range.end <= compaction_key_range.end
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
"gc-compaction statistics: {}",
|
||||
|
||||
Reference in New Issue
Block a user