From 0f6568426342f80c6faba81af9827cc4ad9fa2d6 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 27 Aug 2024 02:19:47 +0800 Subject: [PATCH] feat(pageserver): use split layer writer in gc-compaction (#8608) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part of #8002, the final big PR in the batch. ## Summary of changes This pull request uses the new split layer writer in the gc-compaction. * It changes how layers are split. Previously, we split layers based on the original split point, but this creates too many layers (test_gc_feedback has one key per layer). * Therefore, we first verify if the layer map can be processed by the current algorithm (See https://github.com/neondatabase/neon/pull/8191, it's basically the same check) * On that, we proceed with the compaction. This way, it creates a large enough layer close to the target layer size. * Added a new set of functions `with_discard` in the split layer writer. This helps us skip layers if we are going to produce the same persistent key. * The delta writer will keep the updates of the same key in a single file. This might create a super large layer, but we can optimize it later. * The split layer writer is used in the gc-compaction algorithm, and it will split layers based on size. * Fix the image layer summary block encoded the wrong key range. --------- Signed-off-by: Alex Chi Z Co-authored-by: Arpad Müller Co-authored-by: Christian Schwarz --- libs/pageserver_api/src/key.rs | 9 + pageserver/src/tenant.rs | 220 ++++++++- pageserver/src/tenant/storage_layer.rs | 1 - .../src/tenant/storage_layer/delta_layer.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 25 +- pageserver/src/tenant/storage_layer/layer.rs | 2 + .../src/tenant/storage_layer/split_writer.rs | 369 ++++++++++++--- pageserver/src/tenant/timeline.rs | 17 +- pageserver/src/tenant/timeline/compaction.rs | 447 ++++++++---------- 9 files changed, 751 insertions(+), 343 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 2fdd7de38f..77da58d63e 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -236,6 +236,15 @@ impl Key { field5: u8::MAX, field6: u32::MAX, }; + /// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers + pub const NON_L0_MAX: Key = Key { + field1: u8::MAX, + field2: u32::MAX, + field3: u32::MAX, + field4: u32::MAX, + field5: u8::MAX, + field6: u32::MAX - 1, + }; pub fn from_hex(s: &str) -> Result { if s.len() != 36 { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d3589a12c8..0364d521b6 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -7071,18 +7071,14 @@ mod tests { vec![ // Image layer at GC horizon PersistentLayerKey { - key_range: { - let mut key = Key::MAX; - key.field6 -= 1; - Key::MIN..key - }, + key_range: Key::MIN..Key::NON_L0_MAX, lsn_range: Lsn(0x30)..Lsn(0x31), is_delta: false }, - // The delta layer that is cut in the middle + // The delta layer covers the full range (with the layer key hack to avoid being recognized as L0) PersistentLayerKey { - key_range: get_key(3)..get_key(4), - lsn_range: Lsn(0x30)..Lsn(0x41), + key_range: Key::MIN..Key::NON_L0_MAX, + lsn_range: Lsn(0x30)..Lsn(0x48), is_delta: true }, // The delta3 layer that should not be picked for the compaction @@ -8062,6 +8058,214 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_simple_bottom_most_compaction_with_retain_lsns_single_key() -> anyhow::Result<()> + { + let harness = + TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns_single_key") + .await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(1), + Lsn(0x28), + Value::WalRecord(NeonWalRecord::wal_append("@0x28")), + ), + ]; + let delta2 = vec![ + ( + get_key(1), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(1), + Lsn(0x38), + Value::WalRecord(NeonWalRecord::wal_append("@0x38")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![ + // delta1 and delta 2 only contain a single key but multiple updates + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x30), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x50), delta3), + ], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![ + (Lsn(0x10), tline.timeline_id), + (Lsn(0x20), tline.timeline_id), + ], + cutoffs: GcCutoffs { + time: Lsn(0x30), + space: Lsn(0x30), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30@0x38"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x48"), + Bytes::from_static(b"value 9@0x10@0x48"), + ]; + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_20 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_10 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let verify_result = || async { + let gc_horizon = { + let gc_info = tline.gc_info.read().unwrap(); + gc_info.cutoffs.time + }; + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), gc_horizon, &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x20), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_20[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x10), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_10[idx] + ); + } + }; + + verify_result().await; + + let cancel = CancellationToken::new(); + let mut dryrun_flags = EnumSet::new(); + dryrun_flags.insert(CompactFlags::DryRun); + + tline + .compact_with_gc(&cancel, dryrun_flags, &ctx) + .await + .unwrap(); + // We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs + // cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests. + verify_result().await; + + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + verify_result().await; + + // compact again + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + verify_result().await; + + Ok(()) + } + #[tokio::test] async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> { let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 133b34b8b5..a1202ad507 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,7 +8,6 @@ mod layer_desc; mod layer_name; pub mod merge_iterator; -#[cfg(test)] pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b1b5217f7f..f4a2957972 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -36,6 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi use crate::tenant::disk_btree::{ DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection, }; +use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, @@ -568,7 +569,6 @@ impl DeltaLayerWriterInner { // 5GB limit for objects without multipart upload (which we don't want to use) // Make it a little bit below to account for differing GB units // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html - const S3_UPLOAD_LIMIT: u64 = 4_500_000_000; ensure!( metadata.len() <= S3_UPLOAD_LIMIT, "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!", @@ -702,12 +702,10 @@ impl DeltaLayerWriter { self.inner.take().unwrap().finish(key_end, ctx).await } - #[cfg(test)] pub(crate) fn num_keys(&self) -> usize { self.inner.as_ref().unwrap().num_keys } - #[cfg(test)] pub(crate) fn estimated_size(&self) -> u64 { let inner = self.inner.as_ref().unwrap(); inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 94120a4e3e..3cb2b1c83a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -716,10 +716,6 @@ struct ImageLayerWriterInner { } impl ImageLayerWriterInner { - fn size(&self) -> u64 { - self.tree.borrow_writer().size() + self.blob_writer.size() - } - /// /// Start building a new image layer. /// @@ -854,13 +850,19 @@ impl ImageLayerWriterInner { res?; } + let final_key_range = if let Some(end_key) = end_key { + self.key_range.start..end_key + } else { + self.key_range.clone() + }; + // Fill in the summary on blk 0 let summary = Summary { magic: IMAGE_FILE_MAGIC, format_version: STORAGE_FORMAT_VERSION, tenant_id: self.tenant_shard_id.tenant_id, timeline_id: self.timeline_id, - key_range: self.key_range.clone(), + key_range: final_key_range.clone(), lsn: self.lsn, index_start_blk, index_root_blk, @@ -881,11 +883,7 @@ impl ImageLayerWriterInner { let desc = PersistentLayerDesc::new_img( self.tenant_shard_id, self.timeline_id, - if let Some(end_key) = end_key { - self.key_range.start..end_key - } else { - self.key_range.clone() - }, + final_key_range, self.lsn, metadata.len(), ); @@ -974,14 +972,12 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } - #[cfg(test)] /// Estimated size of the image layer. pub(crate) fn estimated_size(&self) -> u64 { let inner = self.inner.as_ref().unwrap(); inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 } - #[cfg(test)] pub(crate) fn num_keys(&self) -> usize { self.inner.as_ref().unwrap().num_keys } @@ -997,7 +993,6 @@ impl ImageLayerWriter { self.inner.take().unwrap().finish(timeline, ctx, None).await } - #[cfg(test)] /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive. pub(super) async fn finish_with_end_key( mut self, @@ -1011,10 +1006,6 @@ impl ImageLayerWriter { .finish(timeline, ctx, Some(end_key)) .await } - - pub(crate) fn size(&self) -> u64 { - self.inner.as_ref().unwrap().size() - } } impl Drop for ImageLayerWriter { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 2607b574e7..53bb66b95e 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -35,6 +35,8 @@ mod tests; #[cfg(test)] mod failpoints; +pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000; + /// A Layer contains all data in a "rectangle" consisting of a range of keys and /// range of LSNs. /// diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index e12e29cd45..df910b5ad9 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -1,4 +1,4 @@ -use std::{ops::Range, sync::Arc}; +use std::{future::Future, ops::Range, sync::Arc}; use bytes::Bytes; use pageserver_api::key::{Key, KEY_SIZE}; @@ -7,7 +7,32 @@ use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; use crate::tenant::storage_layer::Layer; use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; -use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; +use super::layer::S3_UPLOAD_LIMIT; +use super::{ + DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer, +}; + +pub(crate) enum SplitWriterResult { + Produced(ResidentLayer), + Discarded(PersistentLayerKey), +} + +#[cfg(test)] +impl SplitWriterResult { + fn into_resident_layer(self) -> ResidentLayer { + match self { + SplitWriterResult::Produced(layer) => layer, + SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"), + } + } + + fn into_discarded_layer(self) -> PersistentLayerKey { + match self { + SplitWriterResult::Produced(_) => panic!("unexpected produced layer"), + SplitWriterResult::Discarded(layer) => layer, + } + } +} /// An image writer that takes images and produces multiple image layers. The interface does not /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files @@ -16,11 +41,12 @@ use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; pub struct SplitImageLayerWriter { inner: ImageLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layers: Vec, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn: Lsn, + start_key: Key, } impl SplitImageLayerWriter { @@ -49,16 +75,22 @@ impl SplitImageLayerWriter { timeline_id, tenant_shard_id, lsn, + start_key, }) } - pub async fn put_image( + pub async fn put_image_with_discard_fn( &mut self, key: Key, img: Bytes, tline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result<()> { + discard: D, + ) -> anyhow::Result<()> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -76,33 +108,87 @@ impl SplitImageLayerWriter { ) .await?; let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); - self.generated_layers.push( - prev_image_writer - .finish_with_end_key(tline, key, ctx) - .await?, - ); + let layer_key = PersistentLayerKey { + key_range: self.start_key..key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), + is_delta: false, + }; + self.start_key = key; + + if discard(&layer_key).await { + drop(prev_image_writer); + self.generated_layers + .push(SplitWriterResult::Discarded(layer_key)); + } else { + self.generated_layers.push(SplitWriterResult::Produced( + prev_image_writer + .finish_with_end_key(tline, key, ctx) + .await?, + )); + } } self.inner.put_image(key, img, ctx).await } - pub(crate) async fn finish( + #[cfg(test)] + pub async fn put_image( + &mut self, + key: Key, + img: Bytes, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false }) + .await + } + + pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - ) -> anyhow::Result> { + discard: D, + ) -> anyhow::Result> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { let Self { mut generated_layers, inner, .. } = self; - generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?); + if inner.num_keys() == 0 { + return Ok(generated_layers); + } + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), + is_delta: false, + }; + if discard(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + generated_layers.push(SplitWriterResult::Produced( + inner.finish_with_end_key(tline, end_key, ctx).await?, + )); + } Ok(generated_layers) } + #[cfg(test)] + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) + .await + } + /// When split writer fails, the caller should call this function and handle partially generated layers. - #[allow(dead_code)] - pub(crate) async fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { + pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { Ok((self.generated_layers, self.inner)) } } @@ -110,15 +196,21 @@ impl SplitImageLayerWriter { /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not /// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files /// to be cleaned up). +/// +/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched +/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm +/// will split them into multiple files based on size. #[must_use] pub struct SplitDeltaLayerWriter { inner: DeltaLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layers: Vec, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn_range: Range, + last_key_written: Key, + start_key: Key, } impl SplitDeltaLayerWriter { @@ -147,9 +239,74 @@ impl SplitDeltaLayerWriter { timeline_id, tenant_shard_id, lsn_range, + last_key_written: Key::MIN, + start_key, }) } + /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end. + pub async fn put_value_with_discard_fn( + &mut self, + key: Key, + lsn: Lsn, + val: Value, + tline: &Arc, + ctx: &RequestContext, + discard: D, + ) -> anyhow::Result<()> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { + // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate + // number, and therefore the final layer size could be a little bit larger or smaller than the target. + // + // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction + // strategy. https://github.com/neondatabase/neon/issues/8837 + let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; + if self.inner.num_keys() >= 1 + && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + { + if key != self.last_key_written { + let next_delta_writer = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + key, + self.lsn_range.clone(), + ctx, + ) + .await?; + let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); + let layer_key = PersistentLayerKey { + key_range: self.start_key..key, + lsn_range: self.lsn_range.clone(), + is_delta: true, + }; + self.start_key = key; + if discard(&layer_key).await { + drop(prev_delta_writer); + self.generated_layers + .push(SplitWriterResult::Discarded(layer_key)); + } else { + let (desc, path) = prev_delta_writer.finish(key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + self.generated_layers + .push(SplitWriterResult::Produced(delta_layer)); + } + } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT { + // We have to produce a very large file b/c a key is updated too often. + anyhow::bail!( + "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced", + key, + self.inner.estimated_size() + ); + } + } + self.last_key_written = key; + self.inner.put_value(key, lsn, val, ctx).await + } + pub async fn put_value( &mut self, key: Key, @@ -158,56 +315,64 @@ impl SplitDeltaLayerWriter { tline: &Arc, ctx: &RequestContext, ) -> anyhow::Result<()> { - // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate - // number, and therefore the final layer size could be a little bit larger or smaller than the target. - let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; - if self.inner.num_keys() >= 1 - && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size - { - let next_delta_writer = DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_shard_id, - key, - self.lsn_range.clone(), - ctx, - ) - .await?; - let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); - let (desc, path) = prev_delta_writer.finish(key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - self.generated_layers.push(delta_layer); - } - self.inner.put_value(key, lsn, val, ctx).await + self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false }) + .await } - pub(crate) async fn finish( + pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - ) -> anyhow::Result> { + discard: D, + ) -> anyhow::Result> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { let Self { mut generated_layers, inner, .. } = self; - - let (desc, path) = inner.finish(end_key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - generated_layers.push(delta_layer); + if inner.num_keys() == 0 { + return Ok(generated_layers); + } + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: self.lsn_range.clone(), + is_delta: true, + }; + if discard(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + let (desc, path) = inner.finish(end_key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + generated_layers.push(SplitWriterResult::Produced(delta_layer)); + } Ok(generated_layers) } - /// When split writer fails, the caller should call this function and handle partially generated layers. #[allow(dead_code)] - pub(crate) async fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) + .await + } + + /// When split writer fails, the caller should call this function and handle partially generated layers. + pub(crate) fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { Ok((self.generated_layers, self.inner)) } } #[cfg(test)] mod tests { + use itertools::Itertools; use rand::{RngCore, SeedableRng}; use crate::{ @@ -302,9 +467,16 @@ mod tests { #[tokio::test] async fn write_split() { - let harness = TenantHarness::create("split_writer_write_split") - .await - .unwrap(); + write_split_helper("split_writer_write_split", false).await; + } + + #[tokio::test] + async fn write_split_discard() { + write_split_helper("split_writer_write_split_discard", false).await; + } + + async fn write_split_helper(harness_name: &'static str, discard: bool) { + let harness = TenantHarness::create(harness_name).await.unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant @@ -338,16 +510,19 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image(get_key(i), get_large_img(), &tline, &ctx) + .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async { + discard + }) .await .unwrap(); delta_writer - .put_value( + .put_value_with_discard_fn( get_key(i), Lsn(0x20), Value::Image(get_large_img()), &tline, &ctx, + |_| async { discard }, ) .await .unwrap(); @@ -360,22 +535,39 @@ mod tests { .finish(&tline, &ctx, get_key(N as u32)) .await .unwrap(); - assert_eq!(image_layers.len(), N / 512 + 1); - assert_eq!(delta_layers.len(), N / 512 + 1); - for idx in 0..image_layers.len() { - assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); - assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); - if idx > 0 { - assert_eq!( - image_layers[idx - 1].layer_desc().key_range.end, - image_layers[idx].layer_desc().key_range.start - ); - assert_eq!( - delta_layers[idx - 1].layer_desc().key_range.end, - delta_layers[idx].layer_desc().key_range.start - ); + if discard { + for layer in image_layers { + layer.into_discarded_layer(); + } + for layer in delta_layers { + layer.into_discarded_layer(); + } + } else { + let image_layers = image_layers + .into_iter() + .map(|x| x.into_resident_layer()) + .collect_vec(); + let delta_layers = delta_layers + .into_iter() + .map(|x| x.into_resident_layer()) + .collect_vec(); + assert_eq!(image_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.len(), N / 512 + 1); + for idx in 0..image_layers.len() { + assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); + assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); + if idx > 0 { + assert_eq!( + image_layers[idx - 1].layer_desc().key_range.end, + image_layers[idx].layer_desc().key_range.start + ); + assert_eq!( + delta_layers[idx - 1].layer_desc().key_range.end, + delta_layers[idx].layer_desc().key_range.start + ); + } } } } @@ -456,4 +648,49 @@ mod tests { .unwrap(); assert_eq!(layers.len(), 2); } + + #[tokio::test] + async fn write_split_single_key() { + let harness = TenantHarness::create("split_writer_write_split_single_key") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + const N: usize = 2000; + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + + for i in 0..N { + let i = i as u32; + delta_writer + .put_value( + get_key(0), + Lsn(i as u64 * 16 + 0x10), + Value::Image(get_large_img()), + &tline, + &ctx, + ) + .await + .unwrap(); + } + let delta_layers = delta_writer + .finish(&tline, &ctx, get_key(N as u32)) + .await + .unwrap(); + assert_eq!(delta_layers.len(), 1); + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b33e436fce..098c196ee8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5444,12 +5444,17 @@ impl Timeline { !(a.end <= b.start || b.end <= a.start) } - let guard = self.layers.read().await; - for layer in guard.layer_map()?.iter_historic_layers() { - if layer.is_delta() - && overlaps_with(&layer.lsn_range, &deltas.lsn_range) - && layer.lsn_range != deltas.lsn_range - { + if deltas.key_range.start.next() != deltas.key_range.end { + let guard = self.layers.read().await; + let mut invalid_layers = + guard.layer_map()?.iter_historic_layers().filter(|layer| { + layer.is_delta() + && overlaps_with(&layer.lsn_range, &deltas.lsn_range) + && layer.lsn_range != deltas.lsn_range + // skip single-key layer files + && layer.key_range.start.next() != layer.key_range.end + }); + if let Some(layer) = invalid_layers.next() { // If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic panic!( "inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}", diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7370ec1386..aad75ac59c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -14,7 +14,7 @@ use super::{ RecordedDuration, Timeline, }; -use anyhow::{anyhow, Context}; +use anyhow::{anyhow, bail, Context}; use bytes::Bytes; use enumset::EnumSet; use fail::fail_point; @@ -32,6 +32,9 @@ use crate::page_cache; use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD}; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::merge_iterator::MergeIterator; +use crate::tenant::storage_layer::split_writer::{ + SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult, +}; use crate::tenant::storage_layer::{ AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState, }; @@ -71,15 +74,60 @@ pub(crate) struct KeyHistoryRetention { } impl KeyHistoryRetention { + /// Hack: skip delta layer if we need to produce a layer of a same key-lsn. + /// + /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range. + /// For example, consider the case where a single delta with range [0x10,0x50) exists. + /// And we have branches at LSN 0x10, 0x20, 0x30. + /// Then we delete branch @ 0x20. + /// Bottom-most compaction may now delete the delta [0x20,0x30). + /// And that wouldnt' change the shape of the layer. + /// + /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes. + /// + /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside. + async fn discard_key(key: &PersistentLayerKey, tline: &Arc, dry_run: bool) -> bool { + if dry_run { + return true; + } + let guard = tline.layers.read().await; + if !guard.contains_key(key) { + return false; + } + let layer_generation = guard.get_from_key(key).metadata().generation; + drop(guard); + if layer_generation == tline.generation { + info!( + key=%key, + ?layer_generation, + "discard layer due to duplicated layer key in the same generation", + ); + true + } else { + false + } + } + + /// Pipe a history of a single key to the writers. + /// + /// If `image_writer` is none, the images will be placed into the delta layers. + /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images. + #[allow(clippy::too_many_arguments)] async fn pipe_to( self, key: Key, - delta_writer: &mut Vec<(Key, Lsn, Value)>, - mut image_writer: Option<&mut ImageLayerWriter>, + tline: &Arc, + delta_writer: &mut SplitDeltaLayerWriter, + mut image_writer: Option<&mut SplitImageLayerWriter>, stat: &mut CompactionStatistics, + dry_run: bool, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; + let discard = |key: &PersistentLayerKey| { + let key = key.clone(); + async move { Self::discard_key(&key, tline, dry_run).await } + }; for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon { if first_batch { if logs.len() == 1 && logs[0].1.is_image() { @@ -88,28 +136,45 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer.put_image(key, img.clone(), ctx).await?; + image_writer + .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard) + .await?; } else { - delta_writer.push((key, cutoff_lsn, Value::Image(img.clone()))); + delta_writer + .put_value_with_discard_fn( + key, + cutoff_lsn, + Value::Image(img.clone()), + tline, + ctx, + discard, + ) + .await?; } } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.push((key, lsn, val)); + delta_writer + .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) + .await?; } } first_batch = false; } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.push((key, lsn, val)); + delta_writer + .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) + .await?; } } } let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; for (lsn, val) in above_horizon_logs { stat.produce_key(&val); - delta_writer.push((key, lsn, val)); + delta_writer + .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) + .await?; } Ok(()) } @@ -1814,11 +1879,27 @@ impl Timeline { } let mut selected_layers = Vec::new(); drop(gc_info); + // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers. + let Some(max_layer_lsn) = layers + .iter_historic_layers() + .filter(|desc| desc.get_lsn_range().start <= gc_cutoff) + .map(|desc| desc.get_lsn_range().end) + .max() + else { + info!("no layers to compact with gc"); + return Ok(()); + }; + // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key + // layers to compact. for desc in layers.iter_historic_layers() { - if desc.get_lsn_range().start <= gc_cutoff { + if desc.get_lsn_range().end <= max_layer_lsn { selected_layers.push(guard.get_from_desc(&desc)); } } + if selected_layers.is_empty() { + info!("no layers to compact with gc"); + return Ok(()); + } retain_lsns_below_horizon.sort(); (selected_layers, gc_cutoff, retain_lsns_below_horizon) }; @@ -1848,27 +1929,53 @@ impl Timeline { lowest_retain_lsn ); // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. - // Also, collect the layer information to decide when to split the new delta layers. - let mut downloaded_layers = Vec::new(); - let mut delta_split_points = BTreeSet::new(); + // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. + let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?) for layer in &layer_selection { - let resident_layer = layer.download_and_keep_resident().await?; - downloaded_layers.push(resident_layer); - let desc = layer.layer_desc(); if desc.is_delta() { - // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon) - // so that we can avoid having too many small delta layers. - let key_range = desc.get_key_range(); - delta_split_points.insert(key_range.start); - delta_split_points.insert(key_range.end); + // ignore single-key layer files + if desc.key_range.start.next() != desc.key_range.end { + let lsn_range = &desc.lsn_range; + lsn_split_point.insert(lsn_range.start); + lsn_split_point.insert(lsn_range.end); + } stat.visit_delta_layer(desc.file_size()); } else { stat.visit_image_layer(desc.file_size()); } } + for layer in &layer_selection { + let desc = layer.layer_desc(); + let key_range = &desc.key_range; + if desc.is_delta() && key_range.start.next() != key_range.end { + let lsn_range = desc.lsn_range.clone(); + let intersects = lsn_split_point.range(lsn_range).collect_vec(); + if intersects.len() > 1 { + bail!( + "cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]", + desc.key(), + intersects.into_iter().map(|lsn| lsn.to_string()).join(", ") + ); + } + } + } + // The maximum LSN we are processing in this compaction loop + let end_lsn = layer_selection + .iter() + .map(|l| l.layer_desc().lsn_range.end) + .max() + .unwrap(); + // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized + // as an L0 layer. + let hack_end_key = Key::NON_L0_MAX; let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); + let mut downloaded_layers = Vec::new(); + for layer in &layer_selection { + let resident_layer = layer.download_and_keep_resident().await?; + downloaded_layers.push(resident_layer); + } for resident_layer in &downloaded_layers { if resident_layer.layer_desc().is_delta() { let layer = resident_layer.get_as_delta(ctx).await?; @@ -1884,138 +1991,17 @@ impl Timeline { let mut accumulated_values = Vec::new(); let mut last_key: Option = None; - enum FlushDeltaResult { - /// Create a new resident layer - CreateResidentLayer(ResidentLayer), - /// Keep an original delta layer - KeepLayer(PersistentLayerKey), - } - - #[allow(clippy::too_many_arguments)] - async fn flush_deltas( - deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>, - last_key: Key, - delta_split_points: &[Key], - current_delta_split_point: &mut usize, - tline: &Arc, - lowest_retain_lsn: Lsn, - ctx: &RequestContext, - stats: &mut CompactionStatistics, - dry_run: bool, - last_batch: bool, - ) -> anyhow::Result> { - // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid - // overlapping layers. - // - // If we have a structure like this: - // - // | Delta 1 | | Delta 4 | - // |---------| Delta 2 |---------| - // | Delta 3 | | Delta 5 | - // - // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4. - // A simple solution here is to split the delta layers using the original boundary, while this - // might produce a lot of small layers. This should be improved and fixed in the future. - let mut need_split = false; - while *current_delta_split_point < delta_split_points.len() - && last_key >= delta_split_points[*current_delta_split_point] - { - *current_delta_split_point += 1; - need_split = true; - } - if !need_split && !last_batch { - return Ok(None); - } - let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas); - if deltas.is_empty() { - return Ok(None); - } - let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1; - let delta_key = PersistentLayerKey { - key_range: { - let key_start = deltas.first().unwrap().0; - let key_end = deltas.last().unwrap().0.next(); - key_start..key_end - }, - lsn_range: lowest_retain_lsn..end_lsn, - is_delta: true, - }; - { - // Hack: skip delta layer if we need to produce a layer of a same key-lsn. - // - // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range. - // For example, consider the case where a single delta with range [0x10,0x50) exists. - // And we have branches at LSN 0x10, 0x20, 0x30. - // Then we delete branch @ 0x20. - // Bottom-most compaction may now delete the delta [0x20,0x30). - // And that wouldnt' change the shape of the layer. - // - // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes. - // That's why it's safe to skip. - let guard = tline.layers.read().await; - - if guard.contains_key(&delta_key) { - let layer_generation = guard.get_from_key(&delta_key).metadata().generation; - drop(guard); - if layer_generation == tline.generation { - stats.discard_delta_layer(); - // TODO: depending on whether we design this compaction process to run along with - // other compactions, there could be layer map modifications after we drop the - // layer guard, and in case it creates duplicated layer key, we will still error - // in the end. - info!( - key=%delta_key, - ?layer_generation, - "discard delta layer due to duplicated layer in the same generation" - ); - return Ok(Some(FlushDeltaResult::KeepLayer(delta_key))); - } - } - } - - let mut delta_layer_writer = DeltaLayerWriter::new( - tline.conf, - tline.timeline_id, - tline.tenant_shard_id, - delta_key.key_range.start, - lowest_retain_lsn..end_lsn, - ctx, - ) - .await?; - for (key, lsn, val) in deltas { - delta_layer_writer.put_value(key, lsn, val, ctx).await?; - } - - stats.produce_delta_layer(delta_layer_writer.size()); - if dry_run { - return Ok(None); - } - - let (desc, path) = delta_layer_writer - .finish(delta_key.key_range.end, ctx) - .await?; - let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?; - Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer))) - } - - // Hack the key range to be min..(max-1). Otherwise, the image layer will be - // interpreted as an L0 delta layer. - let hack_image_layer_range = { - let mut end_key = Key::MAX; - end_key.field6 -= 1; - Key::MIN..end_key - }; - // Only create image layers when there is no ancestor branches. TODO: create covering image layer // when some condition meet. let mut image_layer_writer = if self.ancestor_timeline.is_none() { Some( - ImageLayerWriter::new( + SplitImageLayerWriter::new( self.conf, self.timeline_id, self.tenant_shard_id, - &hack_image_layer_range, // covers the full key range + Key::MIN, lowest_retain_lsn, + self.get_compaction_target_size(), ctx, ) .await?, @@ -2024,6 +2010,17 @@ impl Timeline { None }; + let mut delta_layer_writer = SplitDeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + Key::MIN, + lowest_retain_lsn..end_lsn, + self.get_compaction_target_size(), + ctx, + ) + .await?; + /// Returns None if there is no ancestor branch. Throw an error when the key is not found. /// /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image @@ -2044,47 +2041,11 @@ impl Timeline { let img = tline.get(key, tline.ancestor_lsn, ctx).await?; Ok(Some((key, tline.ancestor_lsn, img))) } - let image_layer_key = PersistentLayerKey { - key_range: hack_image_layer_range, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn), - is_delta: false, - }; - - // Like with delta layers, it can happen that we re-produce an already existing image layer. - // This could happen when a user triggers force compaction and image generation. In this case, - // it's always safe to rewrite the layer. - let discard_image_layer = { - let guard = self.layers.read().await; - if guard.contains_key(&image_layer_key) { - let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation; - drop(guard); - if layer_generation == self.generation { - // TODO: depending on whether we design this compaction process to run along with - // other compactions, there could be layer map modifications after we drop the - // layer guard, and in case it creates duplicated layer key, we will still error - // in the end. - info!( - key=%image_layer_key, - ?layer_generation, - "discard image layer due to duplicated layer key in the same generation", - ); - true - } else { - false - } - } else { - false - } - }; // Actually, we can decide not to write to the image layer at all at this point because // the key and LSN range are determined. However, to keep things simple here, we still // create this writer, and discard the writer in the end. - let mut delta_values = Vec::new(); - let delta_split_points = delta_split_points.into_iter().collect_vec(); - let mut current_delta_split_point = 0; - let mut delta_layers = Vec::new(); while let Some((key, lsn, val)) = merge_iter.next().await? { if cancel.is_cancelled() { return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error @@ -2115,27 +2076,14 @@ impl Timeline { retention .pipe_to( *last_key, - &mut delta_values, + self, + &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, + dry_run, ctx, ) .await?; - delta_layers.extend( - flush_deltas( - &mut delta_values, - *last_key, - &delta_split_points, - &mut current_delta_split_point, - self, - lowest_retain_lsn, - ctx, - &mut stat, - dry_run, - false, - ) - .await?, - ); accumulated_values.clear(); *last_key = key; accumulated_values.push((key, lsn, val)); @@ -2159,43 +2107,75 @@ impl Timeline { retention .pipe_to( last_key, - &mut delta_values, + self, + &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, + dry_run, ctx, ) .await?; - delta_layers.extend( - flush_deltas( - &mut delta_values, - last_key, - &delta_split_points, - &mut current_delta_split_point, - self, - lowest_retain_lsn, - ctx, - &mut stat, - dry_run, - true, - ) - .await?, - ); - assert!(delta_values.is_empty(), "unprocessed keys"); - let image_layer = if discard_image_layer { - stat.discard_image_layer(); - None - } else if let Some(writer) = image_layer_writer { - stat.produce_image_layer(writer.size()); + let discard = |key: &PersistentLayerKey| { + let key = key.clone(); + async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await } + }; + + let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { - Some(writer.finish(self, ctx).await?) + writer + .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .await? } else { - None + let (layers, _) = writer.take()?; + assert!(layers.is_empty(), "image layers produced in dry run mode?"); + Vec::new() } } else { - None + Vec::new() }; + let produced_delta_layers = if !dry_run { + delta_layer_writer + .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .await? + } else { + let (layers, _) = delta_layer_writer.take()?; + assert!(layers.is_empty(), "delta layers produced in dry run mode?"); + Vec::new() + }; + + let mut compact_to = Vec::new(); + let mut keep_layers = HashSet::new(); + let produced_delta_layers_len = produced_delta_layers.len(); + let produced_image_layers_len = produced_image_layers.len(); + for action in produced_delta_layers { + match action { + SplitWriterResult::Produced(layer) => { + stat.produce_delta_layer(layer.layer_desc().file_size()); + compact_to.push(layer); + } + SplitWriterResult::Discarded(l) => { + keep_layers.insert(l); + stat.discard_delta_layer(); + } + } + } + for action in produced_image_layers { + match action { + SplitWriterResult::Produced(layer) => { + stat.produce_image_layer(layer.layer_desc().file_size()); + compact_to.push(layer); + } + SplitWriterResult::Discarded(l) => { + keep_layers.insert(l); + stat.discard_image_layer(); + } + } + } + let mut layer_selection = layer_selection; + layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); + info!( "gc-compaction statistics: {}", serde_json::to_string(&stat)? @@ -2206,28 +2186,11 @@ impl Timeline { } info!( - "produced {} delta layers and {} image layers", - delta_layers.len(), - if image_layer.is_some() { 1 } else { 0 } + "produced {} delta layers and {} image layers, {} layers are kept", + produced_delta_layers_len, + produced_image_layers_len, + layer_selection.len() ); - let mut compact_to = Vec::new(); - let mut keep_layers = HashSet::new(); - for action in delta_layers { - match action { - FlushDeltaResult::CreateResidentLayer(layer) => { - compact_to.push(layer); - } - FlushDeltaResult::KeepLayer(l) => { - keep_layers.insert(l); - } - } - } - if discard_image_layer { - keep_layers.insert(image_layer_key); - } - let mut layer_selection = layer_selection; - layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); - compact_to.extend(image_layer); // Step 3: Place back to the layer map. {