diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e5ac6725ad..48c1851a3a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -7347,6 +7347,7 @@ mod tests { Lsn(0x60), &[Lsn(0x20), Lsn(0x40), Lsn(0x50)], 3, + None, ) .await .unwrap(); @@ -7471,7 +7472,7 @@ mod tests { ), ]; let res = tline - .generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3) + .generate_key_retention(key, &history, Lsn(0x60), &[Lsn(0x40), Lsn(0x50)], 3, None) .await .unwrap(); let expected_res = KeyHistoryRetention { @@ -7517,6 +7518,114 @@ mod tests { }; assert_eq!(res, expected_res); + // In case of branch compaction, the branch itself does not have the full history, and we need to provide + // the ancestor image in the test case. + + let history = vec![ + ( + key, + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append(";0x20")), + ), + ( + key, + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append(";0x30")), + ), + ( + key, + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append(";0x40")), + ), + ( + key, + Lsn(0x70), + Value::WalRecord(NeonWalRecord::wal_append(";0x70")), + ), + ]; + let res = tline + .generate_key_retention( + key, + &history, + Lsn(0x60), + &[], + 3, + Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))), + ) + .await + .unwrap(); + let expected_res = KeyHistoryRetention { + below_horizon: vec![( + Lsn(0x60), + KeyLogAtLsn(vec![( + Lsn(0x60), + Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x30;0x40")), // use the ancestor image to reconstruct the page + )]), + )], + above_horizon: KeyLogAtLsn(vec![( + Lsn(0x70), + Value::WalRecord(NeonWalRecord::wal_append(";0x70")), + )]), + }; + assert_eq!(res, expected_res); + + let history = vec![ + ( + key, + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append(";0x20")), + ), + ( + key, + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append(";0x40")), + ), + ( + key, + Lsn(0x60), + Value::WalRecord(NeonWalRecord::wal_append(";0x60")), + ), + ( + key, + Lsn(0x70), + Value::WalRecord(NeonWalRecord::wal_append(";0x70")), + ), + ]; + let res = tline + .generate_key_retention( + key, + &history, + Lsn(0x60), + &[Lsn(0x30)], + 3, + Some((key, Lsn(0x10), Bytes::copy_from_slice(b"0x10"))), + ) + .await + .unwrap(); + let expected_res = KeyHistoryRetention { + below_horizon: vec![ + ( + Lsn(0x30), + KeyLogAtLsn(vec![( + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append(";0x20")), + )]), + ), + ( + Lsn(0x60), + KeyLogAtLsn(vec![( + Lsn(0x60), + Value::Image(Bytes::copy_from_slice(b"0x10;0x20;0x40;0x60")), + )]), + ), + ], + above_horizon: KeyLogAtLsn(vec![( + Lsn(0x70), + Value::WalRecord(NeonWalRecord::wal_append(";0x70")), + )]), + }; + assert_eq!(res, expected_res); + Ok(()) } @@ -7715,4 +7824,186 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(2), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(3), + Lsn(0x28), + Value::WalRecord(NeonWalRecord::wal_append("@0x28")), + ), + ( + get_key(3), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(3), + Lsn(0x40), + Value::WalRecord(NeonWalRecord::wal_append("@0x40")), + ), + ]; + let delta2 = vec![ + ( + get_key(5), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(6), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ]; + + let parent_tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![], // delta layers + vec![(Lsn(0x18), img_layer)], // image layers + Lsn(0x18), + ) + .await?; + + parent_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10))); + + let branch_tline = tenant + .branch_timeline_test_with_layers( + &parent_tline, + NEW_TIMELINE_ID, + Some(Lsn(0x18)), + &ctx, + vec![ + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x48)..Lsn(0x50), delta3), + ], // delta layers + vec![], // image layers + Lsn(0x50), + ) + .await?; + + branch_tline.add_extra_test_dense_keyspace(KeySpace::single(get_key(0)..get_key(10))); + + { + // Update GC info + let mut guard = parent_tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id)], + cutoffs: GcCutoffs { + time: Lsn(0x10), + space: Lsn(0x10), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + { + // Update GC info + let mut guard = branch_tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id)], + cutoffs: GcCutoffs { + time: Lsn(0x50), + space: Lsn(0x50), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10@0x30"), + Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x48"), + Bytes::from_static(b"value 9@0x10@0x48"), + ]; + + let expected_result_at_lsn_40 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10@0x30"), + Bytes::from_static(b"value 3@0x10@0x28@0x30@0x40"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10@0x20"), + Bytes::from_static(b"value 6@0x10@0x20"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let verify_result = || async { + for idx in 0..10 { + assert_eq!( + branch_tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + assert_eq!( + branch_tline + .get(get_key(idx as u32), Lsn(0x40), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_40[idx] + ); + } + }; + + verify_result().await; + + let cancel = CancellationToken::new(); + branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap(); + + verify_result().await; + + Ok(()) + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2b205db6e1..4db44a3a19 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -529,7 +529,6 @@ impl GetVectoredError { } } -#[derive(Debug)] pub struct MissingKeyError { key: Key, shard: ShardNumber, @@ -540,6 +539,12 @@ pub struct MissingKeyError { backtrace: Option, } +impl std::fmt::Debug for MissingKeyError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + impl std::fmt::Display for MissingKeyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7bfa8e9d35..5e9ff1c9e4 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -15,6 +15,7 @@ use super::{ }; use anyhow::{anyhow, Context}; +use bytes::Bytes; use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; @@ -69,17 +70,21 @@ impl KeyHistoryRetention { self, key: Key, delta_writer: &mut Vec<(Key, Lsn, Value)>, - image_writer: &mut ImageLayerWriter, + mut image_writer: Option<&mut ImageLayerWriter>, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; - for (_, KeyLogAtLsn(logs)) in self.below_horizon { + for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon { if first_batch { if logs.len() == 1 && logs[0].1.is_image() { let Value::Image(img) = &logs[0].1 else { unreachable!() }; - image_writer.put_image(key, img.clone(), ctx).await?; + if let Some(image_writer) = image_writer.as_mut() { + image_writer.put_image(key, img.clone(), ctx).await?; + } else { + delta_writer.push((key, cutoff_lsn, Value::Image(img.clone()))); + } } else { for (lsn, val) in logs { delta_writer.push((key, lsn, val)); @@ -1328,6 +1333,7 @@ impl Timeline { horizon: Lsn, retain_lsn_below_horizon: &[Lsn], delta_threshold_cnt: usize, + base_img_from_ancestor: Option<(Key, Lsn, Bytes)>, ) -> anyhow::Result { // Pre-checks for the invariants if cfg!(debug_assertions) { @@ -1357,6 +1363,7 @@ impl Timeline { ); } } + let has_ancestor = base_img_from_ancestor.is_some(); // Step 1: split history into len(retain_lsn_below_horizon) + 2 buckets, where the last bucket is for all deltas above the horizon, // and the second-to-last bucket is for the horizon. Each bucket contains lsn_last_bucket < deltas <= lsn_this_bucket. let (mut split_history, lsn_split_points) = { @@ -1390,6 +1397,9 @@ impl Timeline { // For example, we have delta layer key1@0x10, key1@0x20, and image layer key1@0x10, we will // keep the image for key1@0x10 and the delta for key1@0x20. key1@0x10 delta will be simply // dropped. + // + // TODO: in case we have both delta + images for a given LSN and it does not exceed the delta + // threshold, we could have kept delta instead to save space. This is an optimization for the future. continue; } } @@ -1407,9 +1417,13 @@ impl Timeline { "should have at least below + above horizon batches" ); let mut replay_history: Vec<(Key, Lsn, Value)> = Vec::new(); + if let Some((key, lsn, img)) = base_img_from_ancestor { + replay_history.push((key, lsn, Value::Image(img))); + } for (i, split_for_lsn) in split_history.into_iter().enumerate() { + // TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly. records_since_last_image += split_for_lsn.len(); - let generate_image = if i == 0 { + let generate_image = if i == 0 && !has_ancestor { // We always generate images for the first batch (below horizon / lowest retain_lsn) true } else if i == batch_cnt - 1 { @@ -1532,20 +1546,25 @@ impl Timeline { retain_lsns_below_horizon.sort(); (selected_layers, gc_cutoff, retain_lsns_below_horizon) }; - let lowest_retain_lsn = retain_lsns_below_horizon - .first() - .copied() - .unwrap_or(gc_cutoff); - if cfg!(debug_assertions) { - assert_eq!( - lowest_retain_lsn, - retain_lsns_below_horizon - .iter() - .min() - .copied() - .unwrap_or(gc_cutoff) - ); - } + let lowest_retain_lsn = if self.ancestor_timeline.is_some() { + Lsn(self.ancestor_lsn.0 + 1) + } else { + let res = retain_lsns_below_horizon + .first() + .copied() + .unwrap_or(gc_cutoff); + if cfg!(debug_assertions) { + assert_eq!( + res, + retain_lsns_below_horizon + .iter() + .min() + .copied() + .unwrap_or(gc_cutoff) + ); + } + res + }; info!( "picked {} layers for compaction with gc_cutoff={} lowest_retain_lsn={}", layer_selection.len(), @@ -1586,6 +1605,7 @@ impl Timeline { let mut accumulated_values = Vec::new(); let mut last_key: Option = None; + #[allow(clippy::too_many_arguments)] async fn flush_deltas( deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>, last_key: Key, @@ -1594,6 +1614,7 @@ impl Timeline { tline: &Arc, lowest_retain_lsn: Lsn, ctx: &RequestContext, + last_batch: bool, ) -> anyhow::Result> { // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid // overlapping layers. @@ -1614,7 +1635,7 @@ impl Timeline { *current_delta_split_point += 1; need_split = true; } - if !need_split { + if !need_split && !last_batch { return Ok(None); } let deltas = std::mem::take(deltas); @@ -1639,15 +1660,44 @@ impl Timeline { Ok(Some(delta_layer)) } - let mut image_layer_writer = ImageLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_shard_id, - &(Key::MIN..Key::MAX), // covers the full key range - lowest_retain_lsn, - ctx, - ) - .await?; + // Only create image layers when there is no ancestor branches. TODO: create covering image layer + // when some condition meet. + let mut image_layer_writer = if self.ancestor_timeline.is_none() { + Some( + ImageLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + &(Key::MIN..Key::MAX), // covers the full key range + lowest_retain_lsn, + ctx, + ) + .await?, + ) + } else { + None + }; + + /// Returns None if there is no ancestor branch. Throw an error when the key is not found. + /// + /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image + /// is needed for reconstruction. This should be fixed in the future. + /// + /// Furthermore, we should do vectored get instead of a single get, or better, use k-merge for ancestor + /// images. + async fn get_ancestor_image( + tline: &Arc, + key: Key, + ctx: &RequestContext, + ) -> anyhow::Result> { + if tline.ancestor_timeline.is_none() { + return Ok(None); + }; + // This function is implemented as a get of the current timeline at ancestor LSN, therefore reusing + // as much existing code as possible. + let img = tline.get(key, tline.ancestor_lsn, ctx).await?; + Ok(Some((key, tline.ancestor_lsn, img))) + } let mut delta_values = Vec::new(); let delta_split_points = delta_split_points.into_iter().collect_vec(); @@ -1668,11 +1718,17 @@ impl Timeline { gc_cutoff, &retain_lsns_below_horizon, COMPACTION_DELTA_THRESHOLD, + get_ancestor_image(self, *last_key, ctx).await?, ) .await?; // Put the image into the image layer. Currently we have a single big layer for the compaction. retention - .pipe_to(*last_key, &mut delta_values, &mut image_layer_writer, ctx) + .pipe_to( + *last_key, + &mut delta_values, + image_layer_writer.as_mut(), + ctx, + ) .await?; delta_layers.extend( flush_deltas( @@ -1683,6 +1739,7 @@ impl Timeline { self, lowest_retain_lsn, ctx, + false, ) .await?, ); @@ -1701,11 +1758,17 @@ impl Timeline { gc_cutoff, &retain_lsns_below_horizon, COMPACTION_DELTA_THRESHOLD, + get_ancestor_image(self, last_key, ctx).await?, ) .await?; // Put the image into the image layer. Currently we have a single big layer for the compaction. retention - .pipe_to(last_key, &mut delta_values, &mut image_layer_writer, ctx) + .pipe_to( + last_key, + &mut delta_values, + image_layer_writer.as_mut(), + ctx, + ) .await?; delta_layers.extend( flush_deltas( @@ -1716,19 +1779,25 @@ impl Timeline { self, lowest_retain_lsn, ctx, + true, ) .await?, ); + assert!(delta_values.is_empty(), "unprocessed keys"); - let image_layer = image_layer_writer.finish(self, ctx).await?; + let image_layer = if let Some(writer) = image_layer_writer { + Some(writer.finish(self, ctx).await?) + } else { + None + }; info!( "produced {} delta layers and {} image layers", delta_layers.len(), - 1 + if image_layer.is_some() { 1 } else { 0 } ); let mut compact_to = Vec::new(); compact_to.extend(delta_layers); - compact_to.push(image_layer); + compact_to.extend(image_layer); // Step 3: Place back to the layer map. { let mut guard = self.layers.write().await;