diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 122331ac19..060000a01a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -129,7 +129,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: tokio::sync::RwLock>, + pub(crate) layers: Arc>>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -1418,7 +1418,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: tokio::sync::RwLock::new(LayerMap::default()), + layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -3370,14 +3370,14 @@ struct CompactLevel0Phase1StatsBuilder { version: Option, tenant_id: Option, timeline_id: Option, - first_read_lock_acquisition_micros: DurationRecorder, - get_level0_deltas_plus_drop_lock_micros: DurationRecorder, - level0_deltas_count: Option, - time_spent_between_locks: DurationRecorder, - second_read_lock_acquisition_micros: DurationRecorder, - second_read_lock_held_micros: DurationRecorder, - sort_holes_micros: DurationRecorder, + read_lock_acquisition_micros: DurationRecorder, + read_lock_held_spawn_blocking_startup_micros: DurationRecorder, + read_lock_held_prerequisites_micros: DurationRecorder, + read_lock_held_compute_holes_micros: DurationRecorder, + read_lock_drop_micros: DurationRecorder, + prepare_iterators_micros: DurationRecorder, write_layer_files_micros: DurationRecorder, + level0_deltas_count: Option, new_deltas_count: Option, new_deltas_size: Option, } @@ -3390,14 +3390,14 @@ struct CompactLevel0Phase1Stats { tenant_id: TenantId, #[serde_as(as = "serde_with::DisplayFromStr")] timeline_id: TimelineId, - first_read_lock_acquisition_micros: RecordedDuration, - get_level0_deltas_plus_drop_lock_micros: RecordedDuration, - level0_deltas_count: usize, - time_spent_between_locks: RecordedDuration, - second_read_lock_acquisition_micros: RecordedDuration, - second_read_lock_held_micros: RecordedDuration, - sort_holes_micros: RecordedDuration, + read_lock_acquisition_micros: RecordedDuration, + read_lock_held_spawn_blocking_startup_micros: RecordedDuration, + read_lock_held_prerequisites_micros: RecordedDuration, + read_lock_held_compute_holes_micros: RecordedDuration, + read_lock_drop_micros: RecordedDuration, + prepare_iterators_micros: RecordedDuration, write_layer_files_micros: RecordedDuration, + level0_deltas_count: usize, new_deltas_count: usize, new_deltas_size: u64, } @@ -3406,54 +3406,51 @@ impl TryFrom for CompactLevel0Phase1Stats { type Error = anyhow::Error; fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result { - let CompactLevel0Phase1StatsBuilder { - version, - tenant_id, - timeline_id, - first_read_lock_acquisition_micros, - get_level0_deltas_plus_drop_lock_micros, - level0_deltas_count, - time_spent_between_locks, - second_read_lock_acquisition_micros, - second_read_lock_held_micros, - sort_holes_micros, - write_layer_files_micros, - new_deltas_count, - new_deltas_size, - } = value; - Ok(CompactLevel0Phase1Stats { - version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?, - tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?, - timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?, - first_read_lock_acquisition_micros: first_read_lock_acquisition_micros + Ok(Self { + version: value.version.ok_or_else(|| anyhow!("version not set"))?, + tenant_id: value + .tenant_id + .ok_or_else(|| anyhow!("tenant_id not set"))?, + timeline_id: value + .timeline_id + .ok_or_else(|| anyhow!("timeline_id not set"))?, + read_lock_acquisition_micros: value + .read_lock_acquisition_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?, - get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros + .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?, + read_lock_held_spawn_blocking_startup_micros: value + .read_lock_held_spawn_blocking_startup_micros .into_recorded() - .ok_or_else(|| { - anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set") - })?, - level0_deltas_count: level0_deltas_count - .ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?, - time_spent_between_locks: time_spent_between_locks + .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?, + read_lock_held_prerequisites_micros: value + .read_lock_held_prerequisites_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?, - second_read_lock_acquisition_micros: second_read_lock_acquisition_micros + .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?, + read_lock_held_compute_holes_micros: value + .read_lock_held_compute_holes_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?, - second_read_lock_held_micros: second_read_lock_held_micros + .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?, + read_lock_drop_micros: value + .read_lock_drop_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?, - sort_holes_micros: sort_holes_micros + .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?, + prepare_iterators_micros: value + .prepare_iterators_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?, - write_layer_files_micros: write_layer_files_micros + .ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?, + write_layer_files_micros: value + .write_layer_files_micros .into_recorded() - .ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?, - new_deltas_count: new_deltas_count - .ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?, - new_deltas_size: new_deltas_size - .ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?, + .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?, + level0_deltas_count: value + .level0_deltas_count + .ok_or_else(|| anyhow!("level0_deltas_count not set"))?, + new_deltas_count: value + .new_deltas_count + .ok_or_else(|| anyhow!("new_deltas_count not set"))?, + new_deltas_size: value + .new_deltas_size + .ok_or_else(|| anyhow!("new_deltas_size not set"))?, }) } } @@ -3464,30 +3461,18 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - async fn compact_level0_phase1( - &self, + fn compact_level0_phase1( + self: Arc, _layer_removal_cs: Arc>, + layers: tokio::sync::OwnedRwLockReadGuard>, + mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, ctx: &RequestContext, ) -> Result { - let mut stats = CompactLevel0Phase1StatsBuilder { - version: Some(1), - tenant_id: Some(self.tenant_id), - timeline_id: Some(self.timeline_id), - ..Default::default() - }; - - let begin = tokio::time::Instant::now(); - let layers = self.layers.read().await; - let now = tokio::time::Instant::now(); - stats.first_read_lock_acquisition_micros = - DurationRecorder::Recorded(RecordedDuration(now - begin), now); + stats.read_lock_held_spawn_blocking_startup_micros = + stats.read_lock_acquisition_micros.till_now(); // set by caller let mut level0_deltas = layers.get_level0_deltas()?; - drop(layers); stats.level0_deltas_count = Some(level0_deltas.len()); - stats.get_level0_deltas_plus_drop_lock_micros = - stats.first_read_lock_acquisition_micros.till_now(); - // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); if level0_deltas.is_empty() || level0_deltas.len() < threshold { @@ -3565,6 +3550,53 @@ impl Timeline { // we don't accidentally use it later in the function. drop(level0_deltas); + stats.read_lock_held_prerequisites_micros = stats + .read_lock_held_spawn_blocking_startup_micros + .till_now(); + + // Determine N largest holes where N is number of compacted layers. + let max_holes = deltas_to_compact.len(); + let last_record_lsn = self.get_last_record_lsn(); + let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; + let min_hole_coverage_size = 3; // TODO: something more flexible? + + // min-heap (reserve space for one more element added before eviction) + let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); + let mut prev: Option = None; + for (next_key, _next_lsn, _size) in itertools::process_results( + deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), + )? { + if let Some(prev_key) = prev { + // just first fast filter + if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { + let key_range = prev_key..next_key; + // Measuring hole by just subtraction of i128 representation of key range boundaries + // has not so much sense, because largest holes will corresponds field1/field2 changes. + // But we are mostly interested to eliminate holes which cause generation of excessive image layers. + // That is why it is better to measure size of hole as number of covering image layers. + let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); + if coverage_size >= min_hole_coverage_size { + heap.push(Hole { + key_range, + coverage_size, + }); + if heap.len() > max_holes { + heap.pop(); // remove smallest hole + } + } + } + } + prev = Some(next_key.next()); + } + stats.read_lock_held_compute_holes_micros = + stats.read_lock_held_prerequisites_micros.till_now(); + drop(layers); + stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); + let mut holes = heap.into_vec(); + holes.sort_unstable_by_key(|hole| hole.key_range.start); + let mut next_hole = 0; // index of next hole in holes vector + // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. let all_values_iter = itertools::process_results( @@ -3604,50 +3636,7 @@ impl Timeline { }, )?; - // Determine N largest holes where N is number of compacted layers. - let max_holes = deltas_to_compact.len(); - let last_record_lsn = self.get_last_record_lsn(); - stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now(); - let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? - stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now(); - let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; - let min_hole_coverage_size = 3; // TODO: something more flexible? - - // min-heap (reserve space for one more element added before eviction) - let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); - let mut prev: Option = None; - for (next_key, _next_lsn, _size) in itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), - |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), - )? { - if let Some(prev_key) = prev { - // just first fast filter - if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { - let key_range = prev_key..next_key; - // Measuring hole by just subtraction of i128 representation of key range boundaries - // has not so much sense, because largest holes will corresponds field1/field2 changes. - // But we are mostly interested to eliminate holes which cause generation of excessive image layers. - // That is why it is better to measure size of hole as number of covering image layers. - let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); - if coverage_size >= min_hole_coverage_size { - heap.push(Hole { - key_range, - coverage_size, - }); - if heap.len() > max_holes { - heap.pop(); // remove smallest hole - } - } - } - } - prev = Some(next_key.next()); - } - drop(layers); - stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now(); - let mut holes = heap.into_vec(); - holes.sort_unstable_by_key(|hole| hole.key_range.start); - let mut next_hole = 0; // index of next hole in holes vector - stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now(); + stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now(); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3807,7 +3796,7 @@ impl Timeline { layer_paths.pop().unwrap(); } - stats.write_layer_files_micros = stats.sort_holes_micros.till_now(); + stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now(); stats.new_deltas_count = Some(new_layers.len()); stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); @@ -3846,9 +3835,36 @@ impl Timeline { let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = self - .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) - .await?; + } = { + let phase1_span = info_span!("compact_level0_phase1"); + let myself = Arc::clone(self); + let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future + let mut stats = CompactLevel0Phase1StatsBuilder { + version: Some(2), + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + ..Default::default() + }; + + let begin = tokio::time::Instant::now(); + let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await; + let now = tokio::time::Instant::now(); + stats.read_lock_acquisition_micros = + DurationRecorder::Recorded(RecordedDuration(now - begin), now); + let layer_removal_cs = layer_removal_cs.clone(); + tokio::task::spawn_blocking(move || { + let _entered = phase1_span.enter(); + myself.compact_level0_phase1( + layer_removal_cs, + phase1_layers_locked, + stats, + target_file_size, + &ctx, + ) + }) + .await + .context("spawn_blocking")?? + }; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do