feat(pageserver): support retain_lsn in bottommost gc-compaction (#8328)

part of https://github.com/neondatabase/neon/issues/8002

The main thing in this pull request is the new `generate_key_retention`
function. It decides which deltas to retain and generate images for a
given key based on its history + retain_lsn + horizon.

On that, we generate a flat single level of delta layers over all deltas
included in the compaction. In the future, we can decide whether to
split them over the LSN axis as described in the RFC.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-07-23 19:28:43 -04:00
committed by GitHub
parent 39a35671df
commit 18cf5cfefd
4 changed files with 622 additions and 77 deletions

View File

@@ -3992,6 +3992,7 @@ mod tests {
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
use timeline::{DeltaLayerTestDesc, GcInfo};
use utils::bin_ser::BeSer;
use utils::id::TenantId;
@@ -7214,4 +7215,320 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_generate_key_retention() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_generate_key_retention").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
tline.force_advance_lsn(Lsn(0x70));
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let history = vec![
(
key,
Lsn(0x10),
Value::Image(Bytes::copy_from_slice(b"0x10")),
),
(
key,
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append(";0x20")),
),
(
key,
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(";0x30")),
),
(
key,
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append(";0x40")),
),
(
key,
Lsn(0x50),
Value::WalRecord(NeonWalRecord::wal_append(";0x50")),
),
(
key,
Lsn(0x60),
Value::WalRecord(NeonWalRecord::wal_append(";0x60")),
),
(
key,
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
(
key,
Lsn(0x80),
Value::WalRecord(NeonWalRecord::wal_append(";0x80")),
),
(
key,
Lsn(0x90),
Value::WalRecord(NeonWalRecord::wal_append(";0x90")),
),
];
let res = tline
.generate_key_retention(
key,
&history,
Lsn(0x60),
&[Lsn(0x20), Lsn(0x40), Lsn(0x50)],
3,
)
.await
.unwrap();
let expected_res = KeyHistoryRetention {
below_horizon: vec![
(
Lsn(0x20),
KeyLogAtLsn(vec![(
Lsn(0x20),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20")),
)]),
),
(
Lsn(0x40),
KeyLogAtLsn(vec![
(
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append(";0x30")),
),
(
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append(";0x40")),
),
]),
),
(
Lsn(0x50),
KeyLogAtLsn(vec![(
Lsn(0x50),
Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40;0x50")),
)]),
),
(
Lsn(0x60),
KeyLogAtLsn(vec![(
Lsn(0x60),
Value::WalRecord(NeonWalRecord::wal_append(";0x60")),
)]),
),
],
above_horizon: KeyLogAtLsn(vec![
(
Lsn(0x70),
Value::WalRecord(NeonWalRecord::wal_append(";0x70")),
),
(
Lsn(0x80),
Value::WalRecord(NeonWalRecord::wal_append(";0x80")),
),
(
Lsn(0x90),
Value::WalRecord(NeonWalRecord::wal_append(";0x90")),
),
]),
};
assert_eq!(res, expected_res);
// TODO: more tests with mixed image + delta, adding with k-merge test cases; e2e compaction test
Ok(())
}
#[tokio::test]
async fn test_simple_bottom_most_compaction_with_retain_lsns() -> anyhow::Result<()> {
let harness =
TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let img_layer = (0..10)
.map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10"))))
.collect_vec();
let delta1 = vec![
(
get_key(1),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(2),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x28),
Value::WalRecord(NeonWalRecord::wal_append("@0x28")),
),
(
get_key(3),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append("@0x30")),
),
(
get_key(3),
Lsn(0x40),
Value::WalRecord(NeonWalRecord::wal_append("@0x40")),
),
];
let delta2 = vec![
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
(
get_key(6),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_append("@0x20")),
),
];
let delta3 = vec![
(
get_key(8),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
(
get_key(9),
Lsn(0x48),
Value::WalRecord(NeonWalRecord::wal_append("@0x48")),
),
];
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2),
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3),
], // delta layers
vec![(Lsn(0x10), img_layer)], // image layers
Lsn(0x50),
)
.await?;
{
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![Lsn(0x10), Lsn(0x20)],
cutoffs: GcCutoffs {
time: Lsn(0x30),
space: Lsn(0x30),
},
leases: Default::default(),
within_ancestor_pitr: false,
};
}
let expected_result = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10@0x48"),
Bytes::from_static(b"value 9@0x10@0x48"),
];
let expected_result_at_gc_horizon = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10@0x30"),
Bytes::from_static(b"value 3@0x10@0x28@0x30"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
let expected_result_at_lsn_20 = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10@0x20"),
Bytes::from_static(b"value 2@0x10"),
Bytes::from_static(b"value 3@0x10"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10@0x20"),
Bytes::from_static(b"value 6@0x10@0x20"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
let expected_result_at_lsn_10 = [
Bytes::from_static(b"value 0@0x10"),
Bytes::from_static(b"value 1@0x10"),
Bytes::from_static(b"value 2@0x10"),
Bytes::from_static(b"value 3@0x10"),
Bytes::from_static(b"value 4@0x10"),
Bytes::from_static(b"value 5@0x10"),
Bytes::from_static(b"value 6@0x10"),
Bytes::from_static(b"value 7@0x10"),
Bytes::from_static(b"value 8@0x10"),
Bytes::from_static(b"value 9@0x10"),
];
let verify_result = || async {
for idx in 0..10 {
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x50), &ctx)
.await
.unwrap(),
&expected_result[idx]
);
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x30), &ctx)
.await
.unwrap(),
&expected_result_at_gc_horizon[idx]
);
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x20), &ctx)
.await
.unwrap(),
&expected_result_at_lsn_20[idx]
);
assert_eq!(
tline
.get(get_key(idx as u32), Lsn(0x10), &ctx)
.await
.unwrap(),
&expected_result_at_lsn_10[idx]
);
}
};
verify_result().await;
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
verify_result().await;
Ok(())
}
}

View File

@@ -460,7 +460,12 @@ impl DeltaLayerWriterInner {
will_init: bool,
ctx: &RequestContext,
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
assert!(
self.lsn_range.start <= lsn,
"lsn_start={}, lsn={}",
self.lsn_range.start,
lsn
);
// We don't want to use compression in delta layer creation
let compression = ImageCompressionAlgorithm::Disabled;
let (val, res) = self

View File

@@ -1,5 +1,5 @@
pub(crate) mod analysis;
mod compaction;
pub(crate) mod compaction;
pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;

View File

@@ -28,7 +28,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}
use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState};
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{Layer, ResidentLayer};
@@ -36,7 +36,7 @@ use crate::tenant::DeltaLayer;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::keyspace::KeySpace;
use crate::repository::Key;
use crate::repository::{Key, Value};
use utils::lsn::Lsn;
@@ -45,6 +45,60 @@ use pageserver_compaction::interface::*;
use super::CompactionError;
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// The result of bottom-most compaction for a single key at each LSN.
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>);
/// The result of bottom-most compaction.
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub(crate) struct KeyHistoryRetention {
/// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN.
pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>,
/// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN.
pub(crate) above_horizon: KeyLogAtLsn,
}
impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
delta_writer: &mut Vec<(Key, Lsn, Value)>,
image_writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
for (_, KeyLogAtLsn(logs)) in self.below_horizon {
if first_batch {
if logs.len() == 1 && logs[0].1.is_image() {
let Value::Image(img) = &logs[0].1 else {
unreachable!()
};
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
for (lsn, val) in logs {
delta_writer.push((key, lsn, val));
}
}
first_batch = false;
} else {
for (lsn, val) in logs {
delta_writer.push((key, lsn, val));
}
}
}
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
for (lsn, val) in above_horizon_logs {
delta_writer.push((key, lsn, val));
}
Ok(())
}
}
impl Timeline {
/// TODO: cancellation
pub(crate) async fn compact_legacy(
@@ -989,6 +1043,188 @@ impl Timeline {
Ok(())
}
/// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns.
///
/// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon.
/// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is
/// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch.
///
/// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have:
///
/// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60
/// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3
///
/// The function will produce:
///
/// ```plain
/// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN
/// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas
/// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta
/// above_horizon -> deltas=[+F@0x60] full history above the horizon
/// ```
///
/// Note that `accumulated_values` must be sorted by LSN and should belong to a single key.
pub(crate) async fn generate_key_retention(
self: &Arc<Timeline>,
key: Key,
history: &[(Key, Lsn, Value)],
horizon: Lsn,
retain_lsn_below_horizon: &[Lsn],
delta_threshold_cnt: usize,
) -> anyhow::Result<KeyHistoryRetention> {
// Pre-checks for the invariants
if cfg!(debug_assertions) {
for (log_key, _, _) in history {
assert_eq!(log_key, &key, "mismatched key");
}
for i in 1..history.len() {
assert!(history[i - 1].1 <= history[i].1, "unordered LSN");
if history[i - 1].1 == history[i].1 {
assert!(
matches!(history[i - 1].2, Value::Image(_)),
"unordered delta/image, or duplicated delta"
);
}
}
if let Value::WalRecord(rec) = &history[0].2 {
assert!(rec.will_init(), "no base image");
}
for lsn in retain_lsn_below_horizon {
assert!(lsn < &horizon, "retain lsn must be below horizon")
}
for i in 1..retain_lsn_below_horizon.len() {
assert!(
retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i],
"unordered LSN"
);
}
}
// Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon,
// and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket.
let (mut split_history, lsn_split_points) = {
let mut split_history = Vec::new();
split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new);
let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1);
for lsn in retain_lsn_below_horizon {
lsn_split_points.push(*lsn);
}
lsn_split_points.push(horizon);
let mut current_idx = 0;
for item @ (_, lsn, _) in history {
while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] {
current_idx += 1;
}
split_history[current_idx].push(item);
}
(split_history, lsn_split_points)
};
// Step 2: filter out duplicated records due to the k-merge of image/delta layers
for split_for_lsn in &mut split_history {
let mut prev_lsn = None;
let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len());
for record @ (_, lsn, _) in std::mem::take(split_for_lsn) {
if let Some(prev_lsn) = &prev_lsn {
if *prev_lsn == lsn {
// The case that we have an LSN with both data from the delta layer and the image layer. As
// `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
// drop this delta and keep the image.
//
// For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
// keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
// dropped.
continue;
}
}
prev_lsn = Some(lsn);
new_split_for_lsn.push(record);
}
*split_for_lsn = new_split_for_lsn;
}
// Step 3: generate images when necessary
let mut retention = Vec::with_capacity(split_history.len());
let mut records_since_last_image = 0;
let batch_cnt = split_history.len();
assert!(
batch_cnt >= 2,
"should have at least below + above horizon batches"
);
let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new();
for (i, split_for_lsn) in split_history.into_iter().enumerate() {
records_since_last_image += split_for_lsn.len();
let generate_image = if i == 0 {
// We always generate images for the first batch (below horizon / lowest retain_lsn)
true
} else if i == batch_cnt - 1 {
// Do not generate images for the last batch (above horizon)
false
} else if records_since_last_image >= delta_threshold_cnt {
// Generate images when there are too many records
true
} else {
false
};
replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone()));
if let Some((_, _, val)) = replay_history.first() {
assert!(val.will_init(), "invalid history, no base image");
}
// Only retain the items after the last image record
for idx in (0..replay_history.len()).rev() {
if replay_history[idx].2.will_init() {
replay_history = replay_history[idx..].to_vec();
break;
}
}
if generate_image && records_since_last_image > 0 {
records_since_last_image = 0;
let history = std::mem::take(&mut replay_history);
let mut img = None;
let mut records = Vec::with_capacity(history.len());
if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
img = Some((*lsn, val.clone()));
for (_, lsn, val) in history.into_iter().skip(1) {
let Value::WalRecord(rec) = val else {
panic!("invalid record")
};
records.push((lsn, rec));
}
} else {
for (_, lsn, val) in history.into_iter() {
let Value::WalRecord(rec) = val else {
panic!("invalid record")
};
records.push((lsn, rec));
}
}
records.reverse();
let state = ValueReconstructState { img, records };
let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
let img = self.reconstruct_value(key, request_lsn, state).await?;
replay_history.push((key, request_lsn, Value::Image(img.clone())));
retention.push(vec![(request_lsn, Value::Image(img))]);
} else {
retention.push(
split_for_lsn
.iter()
.map(|(_, lsn, value)| (*lsn, value.clone()))
.collect(),
);
}
}
let mut result = Vec::with_capacity(retention.len());
assert_eq!(retention.len(), lsn_split_points.len() + 1);
for (idx, logs) in retention.into_iter().enumerate() {
if idx == lsn_split_points.len() {
return Ok(KeyHistoryRetention {
below_horizon: result,
above_horizon: KeyLogAtLsn(logs),
});
} else {
result.push((lsn_split_points[idx], KeyLogAtLsn(logs)));
}
}
unreachable!()
}
/// An experimental compaction building block that combines compaction with garbage collection.
///
/// The current implementation picks all delta + image layers that are below or intersecting with
@@ -1000,7 +1236,6 @@ impl Timeline {
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
use crate::tenant::storage_layer::ValueReconstructState;
use std::collections::BTreeSet;
info!("running enhanced gc bottom-most compaction");
@@ -1013,30 +1248,51 @@ impl Timeline {
// The layer selection has the following properties:
// 1. If a layer is in the selection, all layers below it are in the selection.
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
let (layer_selection, gc_cutoff) = {
let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let gc_info = self.gc_info.read().unwrap();
if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() {
return Err(CompactionError::Other(anyhow!(
"enhanced legacy compaction currently does not support retain_lsns (branches)"
)));
}
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = gc_info.cutoffs.select_min();
for lsn in &gc_info.retain_lsns {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
}
}
for lsn in gc_info.leases.keys() {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
}
}
let mut selected_layers = Vec::new();
// TODO: consider retain_lsns
drop(gc_info);
for desc in layers.iter_historic_layers() {
if desc.get_lsn_range().start <= gc_cutoff {
selected_layers.push(guard.get_from_desc(&desc));
}
}
(selected_layers, gc_cutoff)
retain_lsns_below_horizon.sort();
(selected_layers, gc_cutoff, retain_lsns_below_horizon)
};
let lowest_retain_lsn = retain_lsns_below_horizon
.first()
.copied()
.unwrap_or(gc_cutoff);
if cfg!(debug_assertions) {
assert_eq!(
lowest_retain_lsn,
retain_lsns_below_horizon
.iter()
.min()
.copied()
.unwrap_or(gc_cutoff)
);
}
info!(
"picked {} layers for compaction with gc_cutoff={}",
"picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}",
layer_selection.len(),
gc_cutoff
gc_cutoff,
lowest_retain_lsn
);
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, collect the layer information to decide when to split the new delta layers.
@@ -1072,61 +1328,13 @@ impl Timeline {
let mut accumulated_values = Vec::new();
let mut last_key: Option<Key> = None;
/// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon.
async fn flush_accumulated_states(
tline: &Arc<Timeline>,
key: Key,
accumulated_values: &[(Key, Lsn, crate::repository::Value)],
horizon: Lsn,
) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> {
let mut base_image = None;
let mut keys_above_horizon = Vec::new();
let mut delta_above_base_image = Vec::new();
// We have a list of deltas/images. We want to create image layers while collect garbages.
for (key, lsn, val) in accumulated_values.iter().rev() {
if *lsn > horizon {
if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() {
if *prev_lsn == *lsn {
// The case that we have an LSN with both data from the delta layer and the image layer. As
// `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply
// drop this delta and keep the image.
//
// For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will
// keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply
// dropped.
continue;
}
}
keys_above_horizon.push((*key, *lsn, val.clone()));
} else if *lsn <= horizon {
match val {
crate::repository::Value::Image(image) => {
base_image = Some((*lsn, image.clone()));
break;
}
crate::repository::Value::WalRecord(wal) => {
delta_above_base_image.push((*lsn, wal.clone()));
}
}
}
}
// do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records
keys_above_horizon.reverse();
let state = ValueReconstructState {
img: base_image,
records: delta_above_base_image,
};
let img = tline.reconstruct_value(key, horizon, state).await?;
Ok((keys_above_horizon, img))
}
async fn flush_deltas(
deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>,
last_key: Key,
delta_split_points: &[Key],
current_delta_split_point: &mut usize,
tline: &Arc<Timeline>,
gc_cutoff: Lsn,
lowest_retain_lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> {
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
@@ -1161,7 +1369,7 @@ impl Timeline {
tline.timeline_id,
tline.tenant_shard_id,
deltas.first().unwrap().0,
gc_cutoff..end_lsn,
lowest_retain_lsn..end_lsn,
ctx,
)
.await?;
@@ -1178,7 +1386,7 @@ impl Timeline {
self.timeline_id,
self.tenant_shard_id,
&(Key::MIN..Key::MAX), // covers the full key range
gc_cutoff,
lowest_retain_lsn,
ctx,
)
.await?;
@@ -1195,12 +1403,19 @@ impl Timeline {
accumulated_values.push((key, lsn, val));
} else {
let last_key = last_key.as_mut().unwrap();
let (deltas, image) =
flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff)
.await?;
let retention = self
.generate_key_retention(
*last_key,
&accumulated_values,
gc_cutoff,
&retain_lsns_below_horizon,
COMPACTION_DELTA_THRESHOLD,
)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
image_layer_writer.put_image(*last_key, image, ctx).await?;
delta_values.extend(deltas);
retention
.pipe_to(*last_key, &mut delta_values, &mut image_layer_writer, ctx)
.await?;
delta_layers.extend(
flush_deltas(
&mut delta_values,
@@ -1208,7 +1423,7 @@ impl Timeline {
&delta_split_points,
&mut current_delta_split_point,
self,
gc_cutoff,
lowest_retain_lsn,
ctx,
)
.await?,
@@ -1221,11 +1436,19 @@ impl Timeline {
let last_key = last_key.expect("no keys produced during compaction");
// TODO: move this part to the loop body
let (deltas, image) =
flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?;
let retention = self
.generate_key_retention(
last_key,
&accumulated_values,
gc_cutoff,
&retain_lsns_below_horizon,
COMPACTION_DELTA_THRESHOLD,
)
.await?;
// Put the image into the image layer. Currently we have a single big layer for the compaction.
image_layer_writer.put_image(last_key, image, ctx).await?;
delta_values.extend(deltas);
retention
.pipe_to(last_key, &mut delta_values, &mut image_layer_writer, ctx)
.await?;
delta_layers.extend(
flush_deltas(
&mut delta_values,
@@ -1233,7 +1456,7 @@ impl Timeline {
&delta_split_points,
&mut current_delta_split_point,
self,
gc_cutoff,
lowest_retain_lsn,
ctx,
)
.await?,