diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7c6464dab3..cb3ca9c8b9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3992,6 +3992,7 @@ mod tests { use storage_layer::PersistentLayerKey; use tests::storage_layer::ValuesReconstructState; use tests::timeline::{GetVectoredError, ShutdownMode}; + use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn}; use timeline::{DeltaLayerTestDesc, GcInfo}; use utils::bin_ser::BeSer; use utils::id::TenantId; @@ -7214,4 +7215,320 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_generate_key_retention() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_generate_key_retention").await?; + let (tenant, ctx) = harness.load().await; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + tline.force_advance_lsn(Lsn(0x70)); + let key = Key::from_hex("010000000033333333444444445500000000").unwrap(); + let history = vec![ + ( + key, + Lsn(0x10), + Value::Image(Bytes::copy_from_slice(b"0x10")), + ), + ( + key, + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append(";0x20")), + ), + ( + key, + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append(";0x30")), + ), + ( + key, + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append(";0x40")), + ), + ( + key, + Lsn(0x50), + Value::WalRecord(NeonWalRecord::wal_append(";0x50")), + ), + ( + key, + Lsn(0x60), + Value::WalRecord(NeonWalRecord::wal_append(";0x60")), + ), + ( + key, + Lsn(0x70), + Value::WalRecord(NeonWalRecord::wal_append(";0x70")), + ), + ( + key, + Lsn(0x80), + Value::WalRecord(NeonWalRecord::wal_append(";0x80")), + ), + ( + key, + Lsn(0x90), + Value::WalRecord(NeonWalRecord::wal_append(";0x90")), + ), + ]; + let res = tline + .generate_key_retention( + key, + &history, + Lsn(0x60), + &[Lsn(0x20), Lsn(0x40), Lsn(0x50)], + 3, + ) + .await + .unwrap(); + let expected_res = KeyHistoryRetention { + below_horizon: vec![ + ( + Lsn(0x20), + KeyLogAtLsn(vec![( + Lsn(0x20), + Value::Image(Bytes::copy_from_slice(b"0x10;0x20")), + )]), + ), + ( + Lsn(0x40), + KeyLogAtLsn(vec![ + ( + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append(";0x30")), + ), + ( + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append(";0x40")), + ), + ]), + ), + ( + Lsn(0x50), + KeyLogAtLsn(vec![( + Lsn(0x50), + Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40;0x50")), + )]), + ), + ( + Lsn(0x60), + KeyLogAtLsn(vec![( + Lsn(0x60), + Value::WalRecord(NeonWalRecord::wal_append(";0x60")), + )]), + ), + ], + above_horizon: KeyLogAtLsn(vec![ + ( + Lsn(0x70), + Value::WalRecord(NeonWalRecord::wal_append(";0x70")), + ), + ( + Lsn(0x80), + Value::WalRecord(NeonWalRecord::wal_append(";0x80")), + ), + ( + Lsn(0x90), + Value::WalRecord(NeonWalRecord::wal_append(";0x90")), + ), + ]), + }; + assert_eq!(res, expected_res); + // TODO: more tests with mixed image + delta, adding with k-merge test cases; e2e compaction test + Ok(()) + } + + #[tokio::test] + async fn test_simple_bottom_most_compaction_with_retain_lsns() -> anyhow::Result<()> { + let harness = + TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns").await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(2), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(3), + Lsn(0x28), + Value::WalRecord(NeonWalRecord::wal_append("@0x28")), + ), + ( + get_key(3), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(3), + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append("@0x40")), + ), + ]; + let delta2 = vec![ + ( + get_key(5), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(6), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![ + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3), + ], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![Lsn(0x10), Lsn(0x20)], + cutoffs: GcCutoffs { + time: Lsn(0x30), + space: Lsn(0x30), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10@0x30"), + Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x48"), + Bytes::from_static(b"value 9@0x10@0x48"), + ]; + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10@0x30"), + Bytes::from_static(b"value 3@0x10@0x28@0x30"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_20 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_10 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let verify_result = || async { + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x30), &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x20), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_20[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x10), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_10[idx] + ); + } + }; + + verify_result().await; + + let cancel = CancellationToken::new(); + tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + + verify_result().await; + + Ok(()) + } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 512e9e86fa..c73059c34a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -460,7 +460,12 @@ impl DeltaLayerWriterInner { will_init: bool, ctx: &RequestContext, ) -> (Vec, anyhow::Result<()>) { - assert!(self.lsn_range.start <= lsn); + assert!( + self.lsn_range.start <= lsn, + "lsn_start={}, lsn={}", + self.lsn_range.start, + lsn + ); // We don't want to use compression in delta layer creation let compression = ImageCompressionAlgorithm::Disabled; let (val, res) = self diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8866e1af5c..4bfcdc43e8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,5 +1,5 @@ pub(crate) mod analysis; -mod compaction; +pub(crate) mod compaction; pub mod delete; pub(crate) mod detach_ancestor; mod eviction_task; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index a648432b4d..fb8c125b60 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -28,7 +28,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder} use crate::page_cache; use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD}; use crate::tenant::storage_layer::merge_iterator::MergeIterator; -use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc}; +use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState}; use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter}; use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome}; use crate::tenant::timeline::{Layer, ResidentLayer}; @@ -36,7 +36,7 @@ use crate::tenant::DeltaLayer; use crate::virtual_file::{MaybeFatalIo, VirtualFile}; use crate::keyspace::KeySpace; -use crate::repository::Key; +use crate::repository::{Key, Value}; use utils::lsn::Lsn; @@ -45,6 +45,60 @@ use pageserver_compaction::interface::*; use super::CompactionError; +/// Maximum number of deltas before generating an image layer in bottom-most compaction. +const COMPACTION_DELTA_THRESHOLD: usize = 5; + +/// The result of bottom-most compaction for a single key at each LSN. +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct KeyLogAtLsn(pub Vec<(Lsn, Value)>); + +/// The result of bottom-most compaction. +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub(crate) struct KeyHistoryRetention { + /// Stores logs to reconstruct the value at the given LSN, that is to say, logs <= LSN or image == LSN. + pub(crate) below_horizon: Vec<(Lsn, KeyLogAtLsn)>, + /// Stores logs to reconstruct the value at any LSN above the horizon, that is to say, log > LSN. + pub(crate) above_horizon: KeyLogAtLsn, +} + +impl KeyHistoryRetention { + async fn pipe_to( + self, + key: Key, + delta_writer: &mut Vec<(Key, Lsn, Value)>, + image_writer: &mut ImageLayerWriter, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut first_batch = true; + for (_, KeyLogAtLsn(logs)) in self.below_horizon { + if first_batch { + if logs.len() == 1 && logs[0].1.is_image() { + let Value::Image(img) = &logs[0].1 else { + unreachable!() + }; + image_writer.put_image(key, img.clone(), ctx).await?; + } else { + for (lsn, val) in logs { + delta_writer.push((key, lsn, val)); + } + } + first_batch = false; + } else { + for (lsn, val) in logs { + delta_writer.push((key, lsn, val)); + } + } + } + let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; + for (lsn, val) in above_horizon_logs { + delta_writer.push((key, lsn, val)); + } + Ok(()) + } +} + impl Timeline { /// TODO: cancellation pub(crate) async fn compact_legacy( @@ -989,6 +1043,188 @@ impl Timeline { Ok(()) } + /// Take a list of images and deltas, produce images and deltas according to GC horizon and retain_lsns. + /// + /// It takes a key, the values of the key within the compaction process, a GC horizon, and all retain_lsns below the horizon. + /// For now, it requires the `accumulated_values` contains the full history of the key (i.e., the key with the lowest LSN is + /// an image or a WAL not requiring a base image). This restriction will be removed once we implement gc-compaction on branch. + /// + /// The function returns the deltas and the base image that need to be placed at each of the retain LSN. For example, we have: + /// + /// A@0x10, +B@0x20, +C@0x30, +D@0x40, +E@0x50, +F@0x60 + /// horizon = 0x50, retain_lsn = 0x20, 0x40, delta_threshold=3 + /// + /// The function will produce: + /// + /// ```plain + /// 0x20(retain_lsn) -> img=AB@0x20 always produce a single image below the lowest retain LSN + /// 0x40(retain_lsn) -> deltas=[+C@0x30, +D@0x40] two deltas since the last base image, keeping the deltas + /// 0x50(horizon) -> deltas=[ABCDE@0x50] three deltas since the last base image, generate an image but put it in the delta + /// above_horizon -> deltas=[+F@0x60] full history above the horizon + /// ``` + /// + /// Note that `accumulated_values` must be sorted by LSN and should belong to a single key. + pub(crate) async fn generate_key_retention( + self: &Arc, + key: Key, + history: &[(Key, Lsn, Value)], + horizon: Lsn, + retain_lsn_below_horizon: &[Lsn], + delta_threshold_cnt: usize, + ) -> anyhow::Result { + // Pre-checks for the invariants + if cfg!(debug_assertions) { + for (log_key, _, _) in history { + assert_eq!(log_key, &key, "mismatched key"); + } + for i in 1..history.len() { + assert!(history[i - 1].1 <= history[i].1, "unordered LSN"); + if history[i - 1].1 == history[i].1 { + assert!( + matches!(history[i - 1].2, Value::Image(_)), + "unordered delta/image, or duplicated delta" + ); + } + } + if let Value::WalRecord(rec) = &history[0].2 { + assert!(rec.will_init(), "no base image"); + } + for lsn in retain_lsn_below_horizon { + assert!(lsn < &horizon, "retain lsn must be below horizon") + } + for i in 1..retain_lsn_below_horizon.len() { + assert!( + retain_lsn_below_horizon[i - 1] <= retain_lsn_below_horizon[i], + "unordered LSN" + ); + } + } + // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon, + // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket. + let (mut split_history, lsn_split_points) = { + let mut split_history = Vec::new(); + split_history.resize_with(retain_lsn_below_horizon.len() + 2, Vec::new); + let mut lsn_split_points = Vec::with_capacity(retain_lsn_below_horizon.len() + 1); + for lsn in retain_lsn_below_horizon { + lsn_split_points.push(*lsn); + } + lsn_split_points.push(horizon); + let mut current_idx = 0; + for item @ (_, lsn, _) in history { + while current_idx < lsn_split_points.len() && *lsn > lsn_split_points[current_idx] { + current_idx += 1; + } + split_history[current_idx].push(item); + } + (split_history, lsn_split_points) + }; + // Step 2: filter out duplicated records due to the k-merge of image/delta layers + for split_for_lsn in &mut split_history { + let mut prev_lsn = None; + let mut new_split_for_lsn = Vec::with_capacity(split_for_lsn.len()); + for record @ (_, lsn, _) in std::mem::take(split_for_lsn) { + if let Some(prev_lsn) = &prev_lsn { + if *prev_lsn == lsn { + // The case that we have an LSN with both data from the delta layer and the image layer. As + // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply + // drop this delta and keep the image. + // + // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will + // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply + // dropped. + continue; + } + } + prev_lsn = Some(lsn); + new_split_for_lsn.push(record); + } + *split_for_lsn = new_split_for_lsn; + } + // Step 3: generate images when necessary + let mut retention = Vec::with_capacity(split_history.len()); + let mut records_since_last_image = 0; + let batch_cnt = split_history.len(); + assert!( + batch_cnt >= 2, + "should have at least below + above horizon batches" + ); + let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new(); + for (i, split_for_lsn) in split_history.into_iter().enumerate() { + records_since_last_image += split_for_lsn.len(); + let generate_image = if i == 0 { + // We always generate images for the first batch (below horizon / lowest retain_lsn) + true + } else if i == batch_cnt - 1 { + // Do not generate images for the last batch (above horizon) + false + } else if records_since_last_image >= delta_threshold_cnt { + // Generate images when there are too many records + true + } else { + false + }; + replay_history.extend(split_for_lsn.iter().map(|x| (*x).clone())); + if let Some((_, _, val)) = replay_history.first() { + assert!(val.will_init(), "invalid history, no base image"); + } + // Only retain the items after the last image record + for idx in (0..replay_history.len()).rev() { + if replay_history[idx].2.will_init() { + replay_history = replay_history[idx..].to_vec(); + break; + } + } + if generate_image && records_since_last_image > 0 { + records_since_last_image = 0; + let history = std::mem::take(&mut replay_history); + let mut img = None; + let mut records = Vec::with_capacity(history.len()); + if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() { + img = Some((*lsn, val.clone())); + for (_, lsn, val) in history.into_iter().skip(1) { + let Value::WalRecord(rec) = val else { + panic!("invalid record") + }; + records.push((lsn, rec)); + } + } else { + for (_, lsn, val) in history.into_iter() { + let Value::WalRecord(rec) = val else { + panic!("invalid record") + }; + records.push((lsn, rec)); + } + } + records.reverse(); + let state = ValueReconstructState { img, records }; + let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range + let img = self.reconstruct_value(key, request_lsn, state).await?; + replay_history.push((key, request_lsn, Value::Image(img.clone()))); + retention.push(vec![(request_lsn, Value::Image(img))]); + } else { + retention.push( + split_for_lsn + .iter() + .map(|(_, lsn, value)| (*lsn, value.clone())) + .collect(), + ); + } + } + let mut result = Vec::with_capacity(retention.len()); + assert_eq!(retention.len(), lsn_split_points.len() + 1); + for (idx, logs) in retention.into_iter().enumerate() { + if idx == lsn_split_points.len() { + return Ok(KeyHistoryRetention { + below_horizon: result, + above_horizon: KeyLogAtLsn(logs), + }); + } else { + result.push((lsn_split_points[idx], KeyLogAtLsn(logs))); + } + } + unreachable!() + } + /// An experimental compaction building block that combines compaction with garbage collection. /// /// The current implementation picks all delta + image layers that are below or intersecting with @@ -1000,7 +1236,6 @@ impl Timeline { _cancel: &CancellationToken, ctx: &RequestContext, ) -> Result<(), CompactionError> { - use crate::tenant::storage_layer::ValueReconstructState; use std::collections::BTreeSet; info!("running enhanced gc bottom-most compaction"); @@ -1013,30 +1248,51 @@ impl Timeline { // The layer selection has the following properties: // 1. If a layer is in the selection, all layers below it are in the selection. // 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection. - let (layer_selection, gc_cutoff) = { + let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = { let guard = self.layers.read().await; let layers = guard.layer_map(); let gc_info = self.gc_info.read().unwrap(); - if !gc_info.retain_lsns.is_empty() || !gc_info.leases.is_empty() { - return Err(CompactionError::Other(anyhow!( - "enhanced legacy compaction currently does not support retain_lsns (branches)" - ))); - } + let mut retain_lsns_below_horizon = Vec::new(); let gc_cutoff = gc_info.cutoffs.select_min(); + for lsn in &gc_info.retain_lsns { + if lsn < &gc_cutoff { + retain_lsns_below_horizon.push(*lsn); + } + } + for lsn in gc_info.leases.keys() { + if lsn < &gc_cutoff { + retain_lsns_below_horizon.push(*lsn); + } + } let mut selected_layers = Vec::new(); - // TODO: consider retain_lsns drop(gc_info); for desc in layers.iter_historic_layers() { if desc.get_lsn_range().start <= gc_cutoff { selected_layers.push(guard.get_from_desc(&desc)); } } - (selected_layers, gc_cutoff) + retain_lsns_below_horizon.sort(); + (selected_layers, gc_cutoff, retain_lsns_below_horizon) }; + let lowest_retain_lsn = retain_lsns_below_horizon + .first() + .copied() + .unwrap_or(gc_cutoff); + if cfg!(debug_assertions) { + assert_eq!( + lowest_retain_lsn, + retain_lsns_below_horizon + .iter() + .min() + .copied() + .unwrap_or(gc_cutoff) + ); + } info!( - "picked {} layers for compaction with gc_cutoff={}", + "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}", layer_selection.len(), - gc_cutoff + gc_cutoff, + lowest_retain_lsn ); // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. // Also, collect the layer information to decide when to split the new delta layers. @@ -1072,61 +1328,13 @@ impl Timeline { let mut accumulated_values = Vec::new(); let mut last_key: Option = None; - /// Take a list of images and deltas, produce an image at the GC horizon, and a list of deltas above the GC horizon. - async fn flush_accumulated_states( - tline: &Arc, - key: Key, - accumulated_values: &[(Key, Lsn, crate::repository::Value)], - horizon: Lsn, - ) -> anyhow::Result<(Vec<(Key, Lsn, crate::repository::Value)>, bytes::Bytes)> { - let mut base_image = None; - let mut keys_above_horizon = Vec::new(); - let mut delta_above_base_image = Vec::new(); - // We have a list of deltas/images. We want to create image layers while collect garbages. - for (key, lsn, val) in accumulated_values.iter().rev() { - if *lsn > horizon { - if let Some((_, prev_lsn, _)) = keys_above_horizon.last_mut() { - if *prev_lsn == *lsn { - // The case that we have an LSN with both data from the delta layer and the image layer. As - // `ValueWrapper` ensures that an image is ordered before a delta at the same LSN, we simply - // drop this delta and keep the image. - // - // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will - // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply - // dropped. - continue; - } - } - keys_above_horizon.push((*key, *lsn, val.clone())); - } else if *lsn <= horizon { - match val { - crate::repository::Value::Image(image) => { - base_image = Some((*lsn, image.clone())); - break; - } - crate::repository::Value::WalRecord(wal) => { - delta_above_base_image.push((*lsn, wal.clone())); - } - } - } - } - // do not reverse delta_above_base_image, reconstruct state expects reversely-ordered records - keys_above_horizon.reverse(); - let state = ValueReconstructState { - img: base_image, - records: delta_above_base_image, - }; - let img = tline.reconstruct_value(key, horizon, state).await?; - Ok((keys_above_horizon, img)) - } - async fn flush_deltas( deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>, last_key: Key, delta_split_points: &[Key], current_delta_split_point: &mut usize, tline: &Arc, - gc_cutoff: Lsn, + lowest_retain_lsn: Lsn, ctx: &RequestContext, ) -> anyhow::Result> { // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid @@ -1161,7 +1369,7 @@ impl Timeline { tline.timeline_id, tline.tenant_shard_id, deltas.first().unwrap().0, - gc_cutoff..end_lsn, + lowest_retain_lsn..end_lsn, ctx, ) .await?; @@ -1178,7 +1386,7 @@ impl Timeline { self.timeline_id, self.tenant_shard_id, &(Key::MIN..Key::MAX), // covers the full key range - gc_cutoff, + lowest_retain_lsn, ctx, ) .await?; @@ -1195,12 +1403,19 @@ impl Timeline { accumulated_values.push((key, lsn, val)); } else { let last_key = last_key.as_mut().unwrap(); - let (deltas, image) = - flush_accumulated_states(self, *last_key, &accumulated_values, gc_cutoff) - .await?; + let retention = self + .generate_key_retention( + *last_key, + &accumulated_values, + gc_cutoff, + &retain_lsns_below_horizon, + COMPACTION_DELTA_THRESHOLD, + ) + .await?; // Put the image into the image layer. Currently we have a single big layer for the compaction. - image_layer_writer.put_image(*last_key, image, ctx).await?; - delta_values.extend(deltas); + retention + .pipe_to(*last_key, &mut delta_values, &mut image_layer_writer, ctx) + .await?; delta_layers.extend( flush_deltas( &mut delta_values, @@ -1208,7 +1423,7 @@ impl Timeline { &delta_split_points, &mut current_delta_split_point, self, - gc_cutoff, + lowest_retain_lsn, ctx, ) .await?, @@ -1221,11 +1436,19 @@ impl Timeline { let last_key = last_key.expect("no keys produced during compaction"); // TODO: move this part to the loop body - let (deltas, image) = - flush_accumulated_states(self, last_key, &accumulated_values, gc_cutoff).await?; + let retention = self + .generate_key_retention( + last_key, + &accumulated_values, + gc_cutoff, + &retain_lsns_below_horizon, + COMPACTION_DELTA_THRESHOLD, + ) + .await?; // Put the image into the image layer. Currently we have a single big layer for the compaction. - image_layer_writer.put_image(last_key, image, ctx).await?; - delta_values.extend(deltas); + retention + .pipe_to(last_key, &mut delta_values, &mut image_layer_writer, ctx) + .await?; delta_layers.extend( flush_deltas( &mut delta_values, @@ -1233,7 +1456,7 @@ impl Timeline { &delta_split_points, &mut current_delta_split_point, self, - gc_cutoff, + lowest_retain_lsn, ctx, ) .await?,