From 933a869f003732a7c62cd09c580cc17e6e0220cb Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 22 Aug 2023 20:03:14 +0300 Subject: [PATCH] refactor: compaction becomes async again (#5058) #4938 will make on-demand download of layers in compaction possible, so it's not suitable for our "policy" of no `spawn_blocking(|| ... Handle::block_on(async { spawn_blocking(...).await })` because this poses a clear deadlock risk. Nested spawn_blockings are because of the download using `tokio::fs::File`. - Remove `spawn_blocking` from caller of `compact_level0_phase1` - Remove `Handle::block_on` from `compact_level0_phase1` (indentation change) - Revert to `AsLayerDesc::layer_desc` usage temporarily (until it becomes field access in #4938) --- pageserver/src/tenant/timeline.rs | 222 ++++++++++++++---------------- 1 file changed, 107 insertions(+), 115 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5913686bfe..6863e052df 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3410,8 +3410,8 @@ impl Timeline { /// start of level0 files compaction, the on-demand download should be revisited as well. /// /// [`compact_inner`]: Self::compact_inner - fn compact_level0_phase1( - self: Arc, + async fn compact_level0_phase1( + self: &Arc, _layer_removal_cs: Arc>, guard: tokio::sync::OwnedRwLockReadGuard, mut stats: CompactLevel0Phase1StatsBuilder, @@ -3562,7 +3562,7 @@ impl Timeline { .collect(); for dl in downcast_deltas.iter() { // TODO: replace this with an await once we fully go async - all_keys.extend(Handle::current().block_on(DeltaLayer::load_keys(dl, ctx))?); + all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?); } // The current stdlib sorting implementation is designed in a way where it is @@ -3676,107 +3676,103 @@ impl Timeline { let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key - // TODO remove this block_on wrapper once we fully go async - Handle::current().block_on(async { - for &DeltaEntry { - key, lsn, ref val, .. - } in all_values_iter - { - let value = val.load().await?; - let same_key = prev_key.map_or(false, |prev_key| prev_key == key); - // We need to check key boundaries once we reach next key or end of layer with the same key - if !same_key || lsn == dup_end_lsn { - let mut next_key_size = 0u64; - let is_dup_layer = dup_end_lsn.is_valid(); - dup_start_lsn = Lsn::INVALID; - if !same_key { - dup_end_lsn = Lsn::INVALID; - } - // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size - for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { - next_key_size = next_size; - if key != next_key { - if dup_end_lsn.is_valid() { - // We are writting segment with duplicates: - // place all remaining values of this key in separate segment - dup_start_lsn = dup_end_lsn; // new segments starts where old stops - dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range - } - break; - } - key_values_total_size += next_size; - // Check if it is time to split segment: if total keys size is larger than target file size. - // We need to avoid generation of empty segments if next_size > target_file_size. - if key_values_total_size > target_file_size && lsn != next_lsn { - // Split key between multiple layers: such layer can contain only single key - dup_start_lsn = if dup_end_lsn.is_valid() { - dup_end_lsn // new segment with duplicates starts where old one stops - } else { - lsn // start with the first LSN for this key - }; - dup_end_lsn = next_lsn; // upper LSN boundary is exclusive - break; - } - } - // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. - if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { - dup_start_lsn = dup_end_lsn; - dup_end_lsn = lsn_range.end; - } - if writer.is_some() { - let written_size = writer.as_mut().unwrap().size(); - let contains_hole = - next_hole < holes.len() && key >= holes[next_hole].key_range.end; - // check if key cause layer overflow or contains hole... - if is_dup_layer - || dup_end_lsn.is_valid() - || written_size + key_values_total_size > target_file_size - || contains_hole - { - // ... if so, flush previous layer and prepare to write new one - new_layers.push(Arc::new( - writer.take().unwrap().finish(prev_key.unwrap().next())?, - )); - writer = None; - - if contains_hole { - // skip hole - next_hole += 1; - } - } - } - // Remember size of key value because at next iteration we will access next item - key_values_total_size = next_key_size; + for &DeltaEntry { + key, lsn, ref val, .. + } in all_values_iter + { + let value = val.load().await?; + let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + // We need to check key boundaries once we reach next key or end of layer with the same key + if !same_key || lsn == dup_end_lsn { + let mut next_key_size = 0u64; + let is_dup_layer = dup_end_lsn.is_valid(); + dup_start_lsn = Lsn::INVALID; + if !same_key { + dup_end_lsn = Lsn::INVALID; } - if writer.is_none() { - // Create writer if not initiaized yet - writer = Some(DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_id, - key, + // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size + for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { + next_key_size = next_size; + if key != next_key { if dup_end_lsn.is_valid() { - // this is a layer containing slice of values of the same key - debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); - dup_start_lsn..dup_end_lsn + // We are writting segment with duplicates: + // place all remaining values of this key in separate segment + dup_start_lsn = dup_end_lsn; // new segments starts where old stops + dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range + } + break; + } + key_values_total_size += next_size; + // Check if it is time to split segment: if total keys size is larger than target file size. + // We need to avoid generation of empty segments if next_size > target_file_size. + if key_values_total_size > target_file_size && lsn != next_lsn { + // Split key between multiple layers: such layer can contain only single key + dup_start_lsn = if dup_end_lsn.is_valid() { + dup_end_lsn // new segment with duplicates starts where old one stops } else { - debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); - lsn_range.clone() - }, - )?); + lsn // start with the first LSN for this key + }; + dup_end_lsn = next_lsn; // upper LSN boundary is exclusive + break; + } } + // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. + if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { + dup_start_lsn = dup_end_lsn; + dup_end_lsn = lsn_range.end; + } + if writer.is_some() { + let written_size = writer.as_mut().unwrap().size(); + let contains_hole = + next_hole < holes.len() && key >= holes[next_hole].key_range.end; + // check if key cause layer overflow or contains hole... + if is_dup_layer + || dup_end_lsn.is_valid() + || written_size + key_values_total_size > target_file_size + || contains_hole + { + // ... if so, flush previous layer and prepare to write new one + new_layers.push(Arc::new( + writer.take().unwrap().finish(prev_key.unwrap().next())?, + )); + writer = None; - fail_point!("delta-layer-writer-fail-before-finish", |_| { - Result::<_>::Err(anyhow::anyhow!( - "failpoint delta-layer-writer-fail-before-finish" - )) - }); - - writer.as_mut().unwrap().put_value(key, lsn, value)?; - prev_key = Some(key); + if contains_hole { + // skip hole + next_hole += 1; + } + } + } + // Remember size of key value because at next iteration we will access next item + key_values_total_size = next_key_size; } - Ok(()) - })?; + if writer.is_none() { + // Create writer if not initiaized yet + writer = Some(DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + key, + if dup_end_lsn.is_valid() { + // this is a layer containing slice of values of the same key + debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); + dup_start_lsn..dup_end_lsn + } else { + debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); + lsn_range.clone() + }, + )?); + } + + fail_point!("delta-layer-writer-fail-before-finish", |_| { + Err(CompactionError::Other(anyhow::anyhow!( + "failpoint delta-layer-writer-fail-before-finish" + ))) + }); + + writer.as_mut().unwrap().put_value(key, lsn, value)?; + prev_key = Some(key); + } if let Some(writer) = writer { new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); } @@ -3789,10 +3785,10 @@ impl Timeline { // we still might easily hit the limit otherwise. let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2; for layer in new_layers.iter() { - if layer.desc.file_size > warn_limit { + if layer.layer_desc().file_size > warn_limit { warn!( %layer, - "created delta file of size {} larger than double of target of {target_file_size}", layer.desc.file_size + "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size ); } } @@ -3810,7 +3806,7 @@ impl Timeline { stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now(); stats.new_deltas_count = Some(new_layers.len()); - stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); + stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum()); match TryInto::::try_into(stats) .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) @@ -3850,8 +3846,7 @@ impl Timeline { deltas_to_compact, } = { let phase1_span = info_span!("compact_level0_phase1"); - let myself = Arc::clone(self); - let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future + let ctx = ctx.attached_child(); let mut stats = CompactLevel0Phase1StatsBuilder { version: Some(2), tenant_id: Some(self.tenant_id), @@ -3865,18 +3860,15 @@ impl Timeline { stats.read_lock_acquisition_micros = DurationRecorder::Recorded(RecordedDuration(now - begin), now); let layer_removal_cs = layer_removal_cs.clone(); - tokio::task::spawn_blocking(move || { - let _entered = phase1_span.enter(); - myself.compact_level0_phase1( - layer_removal_cs, - phase1_layers_locked, - stats, - target_file_size, - &ctx, - ) - }) - .await - .context("spawn_blocking")?? + self.compact_level0_phase1( + layer_removal_cs, + phase1_layers_locked, + stats, + target_file_size, + &ctx, + ) + .instrument(phase1_span) + .await? }; if new_layers.is_empty() && deltas_to_compact.is_empty() {