From 4e359db4c78e0fd3d3e8f6a69baac4fb5b80b752 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 26 May 2023 17:15:47 -0400 Subject: [PATCH] pgserver: spawn_blocking in compaction (#4265) Compaction is usually a compute-heavy process and might affect other futures running on the thread of the compaction. Therefore, we add `block_in_place` as a temporary solution to avoid blocking other futures on the same thread as compaction in the runtime. As we are migrating towards a fully-async-style pageserver, we can revert this change when everything is async and when we move compaction to a separate runtime. --------- Signed-off-by: Alex Chi --- pageserver/src/context.rs | 3 +- pageserver/src/tenant/par_fsync.rs | 52 +++++++++++++---- pageserver/src/tenant/timeline.rs | 89 ++++++++++++++++++------------ 3 files changed, 98 insertions(+), 46 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index e826d28e6d..f53b7736ab 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -88,6 +88,7 @@ use crate::task_mgr::TaskKind; // The main structure of this module, see module-level comment. +#[derive(Clone, Debug)] pub struct RequestContext { task_kind: TaskKind, download_behavior: DownloadBehavior, @@ -95,7 +96,7 @@ pub struct RequestContext { /// Desired behavior if the operation requires an on-demand download /// to proceed. -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum DownloadBehavior { /// Download the layer file. It can take a while. Download, diff --git a/pageserver/src/tenant/par_fsync.rs b/pageserver/src/tenant/par_fsync.rs index 0b0217ab58..3cbcfe8774 100644 --- a/pageserver/src/tenant/par_fsync.rs +++ b/pageserver/src/tenant/par_fsync.rs @@ -19,14 +19,8 @@ fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result Ok(()) } -pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { - const PARALLEL_PATH_THRESHOLD: usize = 1; - if paths.len() <= PARALLEL_PATH_THRESHOLD { - for path in paths { - fsync_path(path)?; - } - return Ok(()); - } +fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> { + // TODO: remove this function in favor of `par_fsync_async` once we asyncify everything. /// Use at most this number of threads. /// Increasing this limit will @@ -36,11 +30,11 @@ pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { let num_threads = paths.len().min(MAX_NUM_THREADS); let next_path_idx = AtomicUsize::new(0); - crossbeam_utils::thread::scope(|s| -> io::Result<()> { + std::thread::scope(|s| -> io::Result<()> { let mut handles = vec![]; // Spawn `num_threads - 1`, as the current thread is also a worker. for _ in 1..num_threads { - handles.push(s.spawn(|_| parallel_worker(paths, &next_path_idx))); + handles.push(s.spawn(|| parallel_worker(paths, &next_path_idx))); } parallel_worker(paths, &next_path_idx)?; @@ -51,5 +45,41 @@ pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { Ok(()) }) - .unwrap() +} + +/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool. +pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> { + if paths.len() == 1 { + fsync_path(&paths[0])?; + return Ok(()); + } + + fsync_in_thread_pool(paths) +} + +/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current +/// execution thread. Otherwise, we will spawn_blocking and run it in tokio. +pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> { + const MAX_CONCURRENT_FSYNC: usize = 64; + let mut next = paths.iter().peekable(); + let mut js = tokio::task::JoinSet::new(); + loop { + while js.len() < MAX_CONCURRENT_FSYNC && next.peek().is_some() { + let next = next.next().expect("just peeked"); + let next = next.to_owned(); + js.spawn_blocking(move || fsync_path(&next)); + } + + // now the joinset has been filled up, wait for next to complete + if let Some(res) = js.join_next().await { + res??; + } else { + // last item had already completed + assert!( + next.peek().is_none(), + "joinset emptied, we shouldn't have more work" + ); + return Ok(()); + } + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b0aca45882..4bfebd93df 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -195,8 +195,9 @@ pub struct Timeline { /// Layer removal lock. /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], - /// and [`Tenant::delete_timeline`]. - pub(super) layer_removal_cs: tokio::sync::Mutex<()>, + /// and [`Tenant::delete_timeline`]. This is an `Arc` lock because we need an owned + /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`). + pub(super) layer_removal_cs: Arc>, // Needed to ensure that we can't create a branch at a point that was already garbage collected pub latest_gc_cutoff_lsn: Rcu, @@ -669,7 +670,7 @@ impl Timeline { } /// Outermost timeline compaction operation; downloads needed layers. - pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> { + pub async fn compact(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { const ROUNDS: usize = 2; let last_record_lsn = self.get_last_record_lsn(); @@ -758,7 +759,7 @@ impl Timeline { } /// Compaction which might need to be retried after downloading remote layers. - async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> { + async fn compact_inner(self: &Arc, ctx: &RequestContext) -> Result<(), CompactionError> { // // High level strategy for compaction / image creation: // @@ -793,7 +794,7 @@ impl Timeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let layer_removal_cs = self.layer_removal_cs.lock().await; + let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? let state = *self.state.borrow(); if state == TimelineState::Stopping { @@ -827,7 +828,7 @@ impl Timeline { // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); - self.compact_level0(&layer_removal_cs, target_file_size, ctx) + self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx) .await?; timer.stop_and_record(); } @@ -2168,7 +2169,7 @@ impl Timeline { fn delete_historic_layer( &self, // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + _layer_removal_cs: Arc>, layer: Arc, updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, ) -> anyhow::Result<()> { @@ -2632,7 +2633,7 @@ impl Timeline { /// Layer flusher task's main loop. async fn flush_loop( - &self, + self: &Arc, mut layer_flush_start_rx: tokio::sync::watch::Receiver, ctx: &RequestContext, ) { @@ -2723,7 +2724,7 @@ impl Timeline { /// Flush one frozen in-memory layer to disk, as a new delta layer. #[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))] async fn flush_frozen_layer( - &self, + self: &Arc, frozen_layer: Arc, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -2743,7 +2744,11 @@ impl Timeline { .await? } else { // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?; + let this = self.clone(); + let frozen_layer = frozen_layer.clone(); + let (delta_path, metadata) = + tokio::task::spawn_blocking(move || this.create_delta_layer(&frozen_layer)) + .await??; HashMap::from([(delta_path, metadata)]) }; @@ -2847,7 +2852,7 @@ impl Timeline { // Write out the given frozen in-memory layer as a new L0 delta file fn create_delta_layer( - &self, + self: &Arc, frozen_layer: &InMemoryLayer, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { // Write it out @@ -2863,10 +2868,13 @@ impl Timeline { // TODO: If we're running inside 'flush_frozen_layers' and there are multiple // files to flush, it might be better to first write them all, and then fsync // them all in parallel. - par_fsync::par_fsync(&[ - new_delta_path.clone(), - self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - ])?; + + // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace + // this with a single fsync in future refactors. + par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + // Then sync the parent directory. + par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .context("fsync of timeline dir")?; // Add it to the layer map let l = Arc::new(new_delta); @@ -3090,11 +3098,15 @@ impl Timeline { let all_paths = image_layers .iter() .map(|layer| layer.path()) - .chain(std::iter::once( - self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - )) .collect::>(); - par_fsync::par_fsync(&all_paths).context("fsync of newly created layer files")?; + + par_fsync::par_fsync_async(&all_paths) + .await + .context("fsync of newly created layer files")?; + + par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .await + .context("fsync of timeline dir")?; let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); @@ -3159,9 +3171,9 @@ impl Timeline { /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - async fn compact_level0_phase1( + fn compact_level0_phase1( &self, - _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + _layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result { @@ -3474,13 +3486,13 @@ impl Timeline { if !new_layers.is_empty() { let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); - // also sync the directory - layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id)); - // Fsync all the layer files and directory using multiple threads to // minimize latency. par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?; + par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .context("fsync of timeline dir")?; + layer_paths.pop().unwrap(); } @@ -3497,17 +3509,22 @@ impl Timeline { /// as Level 1 files. /// async fn compact_level0( - &self, - layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + self: &Arc, + layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { + let this = self.clone(); + let ctx_inner = ctx.clone(); + let layer_removal_cs_inner = layer_removal_cs.clone(); let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = self - .compact_level0_phase1(layer_removal_cs, target_file_size, ctx) - .await?; + } = tokio::task::spawn_blocking(move || { + this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner) + }) + .await + .unwrap()?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do @@ -3565,7 +3582,7 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs, l, &mut updates)?; + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; } updates.flush(); drop(layers); @@ -3685,7 +3702,7 @@ impl Timeline { fail_point!("before-timeline-gc"); - let layer_removal_cs = self.layer_removal_cs.lock().await; + let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? let state = *self.state.borrow(); if state == TimelineState::Stopping { @@ -3705,7 +3722,7 @@ impl Timeline { let res = self .gc_timeline( - &layer_removal_cs, + layer_removal_cs.clone(), horizon_cutoff, pitr_cutoff, retain_lsns, @@ -3724,7 +3741,7 @@ impl Timeline { async fn gc_timeline( &self, - layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + layer_removal_cs: Arc>, horizon_cutoff: Lsn, pitr_cutoff: Lsn, retain_lsns: Vec, @@ -3897,7 +3914,11 @@ impl Timeline { { for doomed_layer in layers_to_remove { layer_names_to_delete.push(doomed_layer.filename()); - self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning? + self.delete_historic_layer( + layer_removal_cs.clone(), + doomed_layer, + &mut updates, + )?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } }