From f31cc2394b94635c048b939e89f6eb49e28d91fb Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 7 Jul 2023 12:13:44 -0400 Subject: [PATCH] compaction PoC: subcompaction (#4656) This PR adds subcompaction support for compaction PoC. For compaction job >= 4GB, it will be split into 4 threads. --------- Signed-off-by: Alex Chi Z --- pageserver/src/metrics.rs | 28 + .../tenant/storage_layer/inmemory_layer.rs | 4 +- .../src/tenant/storage_layer/layer_desc.rs | 4 +- pageserver/src/tenant/timeline.rs | 559 +++++++++++------- 4 files changed, 361 insertions(+), 234 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 155b39ad24..de346551f5 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -53,6 +53,24 @@ pub enum StorageTimeOperation { CreateTenant, } +pub static NUM_TIERS: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "pageserver_storage_tiers_num", + "Number of sorted runs", + &["tenant_id", "timeline_id"], + ) + .expect("failed to define a metric") +}); + +pub static NUM_COMPACTIONS: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "pageserver_storage_compaction_num", + "Number of ongoing compactions", + &["tenant_id", "timeline_id"], + ) + .expect("failed to define a metric") +}); + pub static STORAGE_PHYSICAL_SIZE: Lazy = Lazy::new(|| { register_int_gauge_vec!( "pageserver_storage_physical_size_sum", @@ -784,6 +802,8 @@ pub struct TimelineMetrics { pub persistent_bytes_written: IntCounter, pub evictions: IntCounter, pub evictions_with_low_residence_duration: std::sync::RwLock, + pub num_tiers: IntGauge, + pub num_compactions: IntGauge, } impl TimelineMetrics { @@ -849,6 +869,12 @@ impl TimelineMetrics { .unwrap(); let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); + let num_tiers = NUM_TIERS + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); + let num_compactions = NUM_COMPACTIONS + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); TimelineMetrics { tenant_id, @@ -875,6 +901,8 @@ impl TimelineMetrics { evictions_with_low_residence_duration, ), read_num_fs_layers, + num_tiers, + num_compactions, } } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 6b58031eb6..c4dcebb621 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -150,8 +150,8 @@ impl Layer for InMemoryLayer { .unwrap_or_default(); println!( - "----- in-memory layer for tli {} LSNs {}-{} ----", - self.timeline_id, self.start_lsn, end_str, + "----- in-memory layer LSNs {}-{} ----", + self.start_lsn, end_str, ); if !verbose { diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index d023f31bdc..9cfcfdba49 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -173,9 +173,7 @@ impl PersistentLayerDesc { pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { println!( - "----- layer for ten {} tli {} keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----", - self.tenant_id, - self.timeline_id, + "----- layer for keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----", self.key_range.start, self.key_range.end, self.lsn_range.start, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 772e1457f4..f3af3a9ade 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -504,6 +504,16 @@ pub enum LogicalSizeCalculationCause { TenantSizeHandler, } +fn overlaps_with(a: &Range, b: &Range) -> bool { + !(a.end <= b.start || b.end <= a.start) +} +/// a contains b +/// ---- a ----- +/// -- b -- +fn contains(a: &Range, b: &Range) -> bool { + b.start >= a.start && b.end <= a.end +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -3155,6 +3165,10 @@ impl Timeline { self.lcache.create_new_layer(l); batch_updates.flush(); + self.metrics + .num_tiers + .set(layers.sorted_runs.num_of_tiers() as i64); + // update the timeline's physical size self.metrics.resident_physical_size_gauge.add(sz); // update metrics @@ -3816,6 +3830,9 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { + self.metrics.num_compactions.inc(); + scopeguard::defer!(self.metrics.num_compactions.dec()); + if ENABLE_TIERED_COMPACTION { return self .compact_tiered(layer_removal_cs, target_file_size, ctx) @@ -4009,7 +4026,7 @@ impl Timeline { } async fn compact_tiered_phase1( - &self, + self: &Arc, _layer_removal_cs: DeleteGuardRead, target_file_size: u64, ctx: &RequestContext, @@ -4079,7 +4096,7 @@ impl Timeline { // compute deltas that can be trivially moved let mut deltas_to_compact_layers = vec![]; let mut trivial_move_layers = vec![]; - for (idx, (image_layers, delta_layers)) in layers_in_tier.into_iter().enumerate() { + for (idx, (_, delta_layers)) in layers_in_tier.iter().enumerate() { let range_to_check = { let start = layers_range .iter() @@ -4097,31 +4114,56 @@ impl Timeline { .unwrap(); start..end }; - fn overlaps_with(a: &Range, b: &Range) -> bool { - !(a.end <= b.start || b.end <= a.start) - } - /// a contains b - /// ---- a ----- - /// -- b -- - fn contains(a: &Range, b: &Range) -> bool { - b.start >= a.start && b.end <= a.end - } for layer in delta_layers.into_iter() { - if overlaps_with(&range_to_check, &layer.get_key_range()) { + if overlaps_with(&range_to_check, &layer.key_range) { // compact if overlaps - deltas_to_compact_layers.push(layer); + deltas_to_compact_layers.push(layer.clone()); } else { // if delta layer does not overlap, trivial move - trivial_move_layers.push(layer); + trivial_move_layers.push(layer.clone()); } } - for layer in image_layers.into_iter() { - if contains(&range_to_check, &layer.get_key_range()) { + } + + // delta no overlap, directly return + if deltas_to_compact_layers.is_empty() { + for (idx, (image_layers, _)) in layers_in_tier.iter().enumerate() { + trivial_move_layers.extend(image_layers.iter().cloned()); + } + return Ok(Some(CompactTieredPhase1Result { + new_layers: vec![], + new_tier_at: *tier_to_compact.last().unwrap(), + removed_tiers: tier_to_compact, + trivial_move_layers: trivial_move_layers + .iter() + .map(|x| self.lcache.get_from_desc(x)) + .collect_vec(), + })); + } + + // otherwise, find containing image layers + let compacting_key_range = { + let start = deltas_to_compact_layers + .iter() + .map(|x| x.key_range.start) + .min() + .unwrap(); + let end = deltas_to_compact_layers + .iter() + .map(|x| x.key_range.end) + .max() + .unwrap(); + start..end + }; + + for (idx, (image_layers, _)) in layers_in_tier.iter().enumerate() { + for layer in image_layers { + if contains(&compacting_key_range, &layer.key_range) { // if image layer is within compaction range, remove it - deltas_to_compact_layers.push(layer); + deltas_to_compact_layers.push(layer.clone()); } else { // otherwise, trivially move - trivial_move_layers.push(layer); + trivial_move_layers.push(layer.clone()); } } } @@ -4129,7 +4171,6 @@ impl Timeline { let deltas_to_compact_layers = deltas_to_compact_layers .into_iter() .map(|l| self.lcache.get_from_desc(&l)) - .filter(|l| l.layer_desc().is_delta()) .collect_vec(); let lsn_range = { @@ -4164,221 +4205,279 @@ impl Timeline { ) }; - if deltas_to_compact_layers.is_empty() { - return Ok(Some(CompactTieredPhase1Result { - new_layers: vec![], - new_tier_at: *tier_to_compact.last().unwrap(), - removed_tiers: tier_to_compact, - trivial_move_layers, - })); + async fn run_compaction_for_range( + tl: Arc, + deltas_to_compact_layers: Vec>, + ctx: &RequestContext, + key_range: Option>, + lsn_range: Range, + target_file_size: u64, + ) -> Result>, CompactionError> { + let deltas_to_compact_layers = deltas_to_compact_layers + .into_iter() + .filter(|x| x.layer_desc().is_delta()) + .collect_vec(); + // This iterator walks through all key-value pairs from all the layers + // we're compacting, in key, LSN order. + let all_values_iter = itertools::process_results( + deltas_to_compact_layers.iter().map(|l| l.iter(ctx)), + |iter_iter| { + iter_iter.kmerge_by(|a, b| { + if let Ok((a_key, a_lsn, _)) = a { + if let Ok((b_key, b_lsn, _)) = b { + match a_key.cmp(b_key) { + Ordering::Less => true, + Ordering::Equal => a_lsn < b_lsn, + Ordering::Greater => false, + } + } else { + false + } + } else { + true + } + }) + }, + )?; + + // TODO(chi): support image layer generation + + // TODO(chi): merge with compact l0 + + let mut new_layers: Vec> = Vec::new(); + let mut prev_key: Option = None; + let mut prev_image_key: Option = None; + let mut writer: Option = None; + let mut image_writer: Option = None; + let mut same_key_cnt = 0; + let mut construct_image_for_key = false; + let image_lsn = Lsn(lsn_range.end.0 - 1); + + const PAGE_MATERIALIZE_THRESHOLD: usize = 16; + + for x in all_values_iter { + let (key, lsn, value) = x?; + if let Some(ref key_range) = key_range { + if key < key_range.start { + continue; + } + if key >= key_range.end { + break; + } + } + + let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + if same_key { + same_key_cnt += 1; + } else { + same_key_cnt = 1; + construct_image_for_key = false; + } + + // We need to check key boundaries once we reach next key or end of layer with the same key + if !same_key { + // TODO: handle the case that updates to one page exceeds one layer file target size, as in + // L0 compaction. + if writer.is_some() { + let written_size = writer.as_mut().unwrap().size(); + // check if key cause layer overflow... + if written_size > target_file_size { + // ... if so, flush previous layer and prepare to write new one + new_layers.push(Arc::new( + writer.take().unwrap().finish(prev_key.unwrap().next())?, + )); + + // only write image layer when we end a delta layer + if image_writer.is_some() { + let image_writer_mut = image_writer.as_mut().unwrap(); + let written_size: u64 = image_writer_mut.size(); + if written_size > target_file_size { + new_layers.push(Arc::new( + image_writer + .take() + .unwrap() + .finish(prev_image_key.unwrap().next())?, + )); + image_writer = None; // this is redundant + } + } + + writer = None; // this is redundant + } + } + } + if writer.is_none() { + // Create writer if not initiaized yet + writer = Some(DeltaLayerWriter::new( + tl.conf, + tl.timeline_id, + tl.tenant_id, + key, + lsn_range.clone(), + )?); + } + + fail_point!("delta-layer-writer-fail-before-finish", |_| { + Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into()) + }); + + if same_key_cnt >= PAGE_MATERIALIZE_THRESHOLD { + assert!(lsn <= image_lsn); + if !construct_image_for_key { + let img = match tl.get(key, image_lsn, ctx).await { + Ok(img) => img, + Err(err) => { + if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) { + warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}"); + ZERO_PAGE.clone() + } else { + return Err(CompactionError::Other(err.into())); + } + } + }; + + if image_writer.is_none() { + // Create writer if not initiaized yet + image_writer = Some(ImageLayerWriter::new( + tl.conf, + tl.timeline_id, + tl.tenant_id, + // TODO(chi): should not use the full key range + key.clone(), + image_lsn, + true, + )?); + } + + let image_writer_mut = image_writer.as_mut().unwrap(); + image_writer_mut.put_image(key, &img)?; + construct_image_for_key = true; + prev_image_key = Some(key); + } + } + + writer.as_mut().unwrap().put_value(key, lsn, value)?; + prev_key = Some(key); + } + if let Some(writer) = writer { + new_layers.push(Arc::new(writer.finish(prev_key.as_ref().unwrap().next())?)); + } + + if let Some(image_writer) = image_writer { + new_layers.push(Arc::new(image_writer.finish(prev_key.unwrap().next())?)); + } + + if let Some(ref key_range) = key_range { + let total_size = new_layers + .iter() + .map(|x| x.layer_desc().file_size()) + .sum::(); + info!( + "subcompaction completed for key range {}..{} with total size of {}", + key_range.start, key_range.end, total_size + ); + } + + Ok(new_layers) } - // This iterator walks through all key-value pairs from all the layers - // we're compacting, in key, LSN order. - let all_values_iter = itertools::process_results( - deltas_to_compact_layers.iter().map(|l| l.iter(ctx)), - |iter_iter| { - iter_iter.kmerge_by(|a, b| { - if let Ok((a_key, a_lsn, _)) = a { - if let Ok((b_key, b_lsn, _)) = b { - match a_key.cmp(b_key) { - Ordering::Less => true, - Ordering::Equal => a_lsn < b_lsn, - Ordering::Greater => false, - } - } else { - false - } - } else { - true - } - }) - }, - )?; + let total_size = deltas_to_compact_layers + .iter() + .map(|x| x.layer_desc().file_size()) + .sum::(); - // This iterator walks through all keys and is needed to calculate size used by each key - let mut all_keys_iter = itertools::process_results( - deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)), - |iter_iter| { - iter_iter.kmerge_by(|a, b| { - let (a_key, a_lsn, _) = a; - let (b_key, b_lsn, _) = b; - match a_key.cmp(b_key) { - Ordering::Less => true, - Ordering::Equal => a_lsn < b_lsn, - Ordering::Greater => false, - } - }) - }, - )?; + const SUBCOMPACTION_JOBS: usize = 4; - // TODO(chi): support image layer generation - - // TODO(chi): merge with compact l0 - - let mut new_layers: Vec> = Vec::new(); - let mut prev_key: Option = None; - let mut prev_image_key: Option = None; - let mut writer: Option = None; - let mut image_writer: Option = None; - let mut key_values_total_size = 0u64; - let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key - let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key - let mut same_key_cnt = 0; - let mut construct_image_for_key = false; - let image_lsn = Lsn(lsn_range.end.0 - 1); - - const PAGE_MATERIALIZE_THRESHOLD: usize = 40; - - for x in all_values_iter { - let (key, lsn, value) = x?; - let same_key = prev_key.map_or(false, |prev_key| prev_key == key); - if same_key { - same_key_cnt += 1; - } else { - same_key_cnt = 1; - construct_image_for_key = false; - } - if same_key_cnt >= PAGE_MATERIALIZE_THRESHOLD { - assert!(lsn <= image_lsn); - if !construct_image_for_key { - let img = match self.get(key, image_lsn, ctx).await { - Ok(img) => img, - Err(err) => { - if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) { - warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}"); - ZERO_PAGE.clone() - } else { - return Err(CompactionError::Other(err.into())); - } - } - }; - if image_writer.is_none() { - // Create writer if not initiaized yet - image_writer = Some(ImageLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_id, - // TODO(chi): should not use the full key range - key.clone(), - image_lsn, - true, - )?); - } - - let image_writer_mut = image_writer.as_mut().unwrap(); - image_writer_mut.put_image(key, &img)?; - construct_image_for_key = true; - prev_image_key = Some(key); - } - } - - // We need to check key boundaries once we reach next key or end of layer with the same key - if !same_key || lsn == dup_end_lsn { - let mut next_key_size = 0u64; - let is_dup_layer = dup_end_lsn.is_valid(); - dup_start_lsn = Lsn::INVALID; - if !same_key { - dup_end_lsn = Lsn::INVALID; - } - // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size - for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { - next_key_size = next_size; - if key != next_key { - if dup_end_lsn.is_valid() { - // We are writting segment with duplicates: - // place all remaining values of this key in separate segment - dup_start_lsn = dup_end_lsn; // new segments starts where old stops - dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range - } - break; - } - key_values_total_size += next_size; - // Check if it is time to split segment: if total keys size is larger than target file size. - // We need to avoid generation of empty segments if next_size > target_file_size. - if key_values_total_size > target_file_size && lsn != next_lsn { - // Split key between multiple layers: such layer can contain only single key - dup_start_lsn = if dup_end_lsn.is_valid() { - dup_end_lsn // new segment with duplicates starts where old one stops - } else { - lsn // start with the first LSN for this key - }; - dup_end_lsn = next_lsn; // upper LSN boundary is exclusive - break; - } - } - // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. - if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { - dup_start_lsn = dup_end_lsn; - dup_end_lsn = lsn_range.end; - } - - if writer.is_some() { - let written_size = writer.as_mut().unwrap().size(); - // check if key cause layer overflow or contains hole... - if is_dup_layer - || dup_end_lsn.is_valid() - || written_size + key_values_total_size > target_file_size - { - // ... if so, flush previous layer and prepare to write new one - new_layers.push(Arc::new( - writer.take().unwrap().finish(prev_key.unwrap().next())?, - )); - - // only write image layer when we end a delta layer - if image_writer.is_some() { - let image_writer_mut = image_writer.as_mut().unwrap(); - let written_size: u64 = image_writer_mut.size(); - if written_size + key_values_total_size > target_file_size / 2 { - new_layers.push(Arc::new( - image_writer - .take() - .unwrap() - .finish(prev_image_key.unwrap().next())?, - )); - image_writer = None; // this is redundant - } - } - - writer = None; // this is redundant - } - } - - // Remember size of key value because at next iteration we will access next item - key_values_total_size = next_key_size; - } - if writer.is_none() { - // Create writer if not initiaized yet - writer = Some(DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_id, - key, - if dup_end_lsn.is_valid() { - // this is a layer containing slice of values of the same key - debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); - dup_start_lsn..dup_end_lsn - } else { - debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); - lsn_range.clone() - }, - )?); - } - - fail_point!("delta-layer-writer-fail-before-finish", |_| { - Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into()) + let new_layers = if total_size > 16 * target_file_size { + // find split points + let mut deltas_to_compact_layers = deltas_to_compact_layers; + deltas_to_compact_layers.sort_by(|a, b| { + a.layer_desc() + .key_range + .start + .cmp(&b.layer_desc().key_range.start) }); - writer.as_mut().unwrap().put_value(key, lsn, value)?; - prev_key = Some(key); - } - if let Some(writer) = writer { - new_layers.push(Arc::new(writer.finish(prev_key.as_ref().unwrap().next())?)); - } + let size_per_partition = total_size / SUBCOMPACTION_JOBS as u64; + let mut current_size = 0; + let mut partitions = vec![]; + partitions.push(deltas_to_compact_layers[0].layer_desc().key_range.start); + for layer in &deltas_to_compact_layers { + if current_size >= size_per_partition { + partitions.push(layer.layer_desc().key_range.start); + if partitions.len() >= SUBCOMPACTION_JOBS { + break; + } + current_size -= size_per_partition; + } + current_size += layer.layer_desc().file_size(); + } + partitions.push( + deltas_to_compact_layers + .iter() + .map(|x| x.layer_desc().key_range.end) + .max() + .unwrap(), + ); - if let Some(image_writer) = image_writer { - new_layers.push(Arc::new(image_writer.finish(prev_key.unwrap().next())?)); - } + info!("subcompaction partitions: {}", partitions.iter().join(", ")); + + // pick overlapping delta files + let mut subcompaction_jobs = vec![]; + for i in 1..partitions.len() { + let compact_range = partitions[i - 1]..partitions[i]; + let mut sub_deltas_to_compact_layers = vec![]; + for layer in &deltas_to_compact_layers { + if overlaps_with(&layer.layer_desc().key_range, &compact_range) { + sub_deltas_to_compact_layers.push(layer.clone()); + } + } + subcompaction_jobs.push((compact_range, sub_deltas_to_compact_layers)); + } + + // parallel compaction + let mut handles = vec![]; + for (id, (key_range, deltas_to_compact_layers)) in + subcompaction_jobs.into_iter().enumerate() + { + let ctx = ctx.clone(); + let span = span!(Level::INFO, "subcompaction", %id); + let lsn_range = lsn_range.clone(); + let this = self.clone(); + handles.push(tokio::spawn( + async move { + run_compaction_for_range( + this, + deltas_to_compact_layers, + &ctx, + Some(key_range), + lsn_range, + target_file_size, + ) + .await + } + .instrument(span), + )); + } + + let mut new_layers = vec![]; + for handle in handles { + let result = handle.await.unwrap()?; + new_layers.extend(result); + } + + new_layers + } else { + run_compaction_for_range( + self.clone(), + deltas_to_compact_layers, + ctx, + None, + lsn_range, + target_file_size, + ) + .await? + }; // Sync layers if !new_layers.is_empty() { @@ -4395,8 +4494,6 @@ impl Timeline { layer_paths.pop().unwrap(); } - drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed - Ok(Some(CompactTieredPhase1Result { new_layers, new_tier_at: *tier_to_compact.last().unwrap(), @@ -4539,6 +4636,10 @@ impl Timeline { "compaction complete, removed_tiers = {removed_tiers:?}, new_tier_at = {new_tier_at}" ); + self.metrics + .num_tiers + .set(layers.sorted_runs.num_of_tiers() as i64); + drop_wlock(guard); // Also schedule the deletions in remote storage