From 76b339b15049da100c03c30973054ae2c4e097be Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Wed, 21 Jun 2023 14:38:11 -0400 Subject: [PATCH] create partial image layers Signed-off-by: Alex Chi --- pageserver/src/tenant/storage_layer.rs | 5 ++ .../src/tenant/storage_layer/delta_layer.rs | 4 + pageserver/src/tenant/timeline.rs | 76 +++++++++++++++---- 3 files changed, 69 insertions(+), 16 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 7bc513b3a1..b5f3bc8e07 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -353,6 +353,11 @@ pub trait Layer: std::fmt::Debug + Send + Sync { /// the previous non-incremental layer. fn is_incremental(&self) -> bool; + /// Is this a delta layer? + fn is_delta(&self) -> bool { + false + } + /// /// Return data needed to reconstruct given page at LSN. /// diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6e14663121..b9c3400225 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -399,6 +399,10 @@ impl Layer for DeltaLayer { fn short_id(&self) -> String { self.layer_desc().short_id() } + + fn is_delta(&self) -> bool { + true + } } impl PersistentLayer for DeltaLayer { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d43dd95b90..ccc9fb8a7f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3359,7 +3359,7 @@ struct CompactLevel0Phase1Result { #[derive(Default)] struct CompactTieredPhase1Result { - new_layers: Vec, + new_layers: Vec>, new_tier_at: usize, removed_tiers: Vec, } @@ -3842,11 +3842,9 @@ impl Timeline { let mut total_size_up_to_lvl = 0; let mut compact_tiers = Vec::new(); for (tier_id, size) in tier_sizes { - if total_size_up_to_lvl != 0 { - if total_size_up_to_lvl as f64 / size as f64 > size_ratio { - info!("full compaction triggered by size ratio"); - return Some(compact_tiers); - } + if total_size_up_to_lvl != 0 && total_size_up_to_lvl as f64 / size as f64 > size_ratio { + info!("full compaction triggered by size ratio"); + return Some(compact_tiers); } total_size_up_to_lvl += size; compact_tiers.push(tier_id); @@ -3903,6 +3901,7 @@ impl Timeline { let deltas_to_compact_layers = deltas_to_compact_layers .into_iter() .map(|l| self.lcache.get_from_desc(&l)) + .filter(|l| l.is_delta()) .collect_vec(); let lsn_range = { @@ -3974,15 +3973,54 @@ impl Timeline { // TODO(chi): merge with compact l0 - let mut new_layers = Vec::new(); + let mut new_layers: Vec> = Vec::new(); let mut prev_key: Option = None; let mut writer: Option = None; + let mut image_writer: Option = None; let mut key_values_total_size = 0u64; let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key + let mut same_key_cnt = 0; + let mut construct_image_for_key = false; + let image_lsn = Lsn(lsn_range.end.0 - 1); + for x in all_values_iter { let (key, lsn, value) = x?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + if same_key { + same_key_cnt += 1; + } else { + same_key_cnt = 1; + construct_image_for_key = false; + } + if same_key_cnt >= 20 && !construct_image_for_key { + let img = match self.get(key, image_lsn, ctx).await { + Ok(img) => img, + Err(err) => { + if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) { + warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}"); + ZERO_PAGE.clone() + } else { + return Err(CompactionError::Other(err.into())); + } + } + }; + if image_writer.is_none() { + // Create writer if not initiaized yet + image_writer = Some(ImageLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + // TODO(chi): should not use the full key range + &(key..Key::MAX), + image_lsn, + true, + )?); + } + image_writer.as_mut().unwrap().put_image(key, &img)?; + construct_image_for_key = true; + } + // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { let mut next_key_size = 0u64; @@ -4030,7 +4068,9 @@ impl Timeline { || written_size + key_values_total_size > target_file_size { // ... if so, flush previous layer and prepare to write new one - new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); + new_layers.push(Arc::new( + writer.take().unwrap().finish(prev_key.unwrap().next())?, + )); writer = None; } } @@ -4063,12 +4103,17 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(writer.finish(prev_key.unwrap().next())?); + new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); + } + + if let Some(image_writer) = image_writer { + new_layers.push(Arc::new(image_writer.finish()?)); } // Sync layers if !new_layers.is_empty() { - let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); + let mut layer_paths: Vec = + new_layers.iter().map(|l| l.local_path().unwrap()).collect(); // Fsync all the layer files and directory using multiple threads to // minimize latency. @@ -4148,7 +4193,7 @@ impl Timeline { let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); let mut new_layer_descs = vec![]; for l in new_layers { - let new_path = l.path(); + let new_path = l.local_path().unwrap(); let metadata = new_path.metadata().with_context(|| { format!( @@ -4170,15 +4215,14 @@ impl Timeline { .add(metadata.len()); new_layer_paths.insert(new_path, LayerFileMetadata::new(metadata.len())); - let x: Arc = Arc::new(l); - x.access_stats().record_residence_event( + l.access_stats().record_residence_event( &updates, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic_new(x.layer_desc().clone()); - new_layer_descs.push(x.layer_desc().clone().into()); - self.lcache.create_new_layer(x); + updates.insert_historic_new(l.layer_desc().clone()); + new_layer_descs.push(l.layer_desc().clone().into()); + self.lcache.create_new_layer(l); } let new_tier_id = updates.next_tier_id();