diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f86657fa77..d2a7adc5fe 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1028,7 +1028,7 @@ async fn timeline_compact_handler( timeline .compact(&cancel, &ctx) .await - .map_err(ApiError::InternalServerError)?; + .map_err(|e| ApiError::InternalServerError(e.into()))?; json_response(StatusCode::OK, ()) } .instrument(info_span!("manual_compaction", %tenant_id, %timeline_id)) @@ -1053,7 +1053,7 @@ async fn timeline_checkpoint_handler( timeline .compact(&cancel, &ctx) .await - .map_err(ApiError::InternalServerError)?; + .map_err(|e| ApiError::InternalServerError(e.into()))?; json_response(StatusCode::OK, ()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5278fe108c..452f6423eb 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -626,9 +626,7 @@ impl Timeline { self: &Arc, cancel: &CancellationToken, ctx: &RequestContext, - ) -> anyhow::Result<()> { - const ROUNDS: usize = 2; - + ) -> Result<(), CompactionError> { static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS; @@ -670,87 +668,6 @@ impl Timeline { return Ok(()); } - // retry two times to allow first round to find layers which need to be downloaded, then - // download them, then retry compaction - for round in 0..ROUNDS { - // should we error out with the most specific error? - let last_round = round == ROUNDS - 1; - - let res = self.compact_inner(ctx).await; - - // If `create_image_layers' or `compact_level0` scheduled any - // uploads or deletions, but didn't update the index file yet, - // do it now. - // - // This isn't necessary for correctness, the remote state is - // consistent without the uploads and deletions, and we would - // update the index file on next flush iteration too. But it - // could take a while until that happens. - // - // Additionally, only do this once before we return from this function. - if last_round || res.is_ok() { - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_index_upload_for_file_changes()?; - } - } - - let rls = match res { - Ok(()) => return Ok(()), - Err(CompactionError::DownloadRequired(rls)) if !last_round => { - // this can be done at most one time before exiting, waiting - rls - } - Err(CompactionError::DownloadRequired(rls)) => { - anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len()) - } - Err(CompactionError::ShuttingDown) => { - return Ok(()); - } - Err(CompactionError::Other(e)) => { - return Err(e); - } - }; - - // this path can be visited in the second round of retrying, if first one found that we - // must first download some remote layers - let total = rls.len(); - - let mut downloads = rls - .into_iter() - .map(|rl| self.download_remote_layer(rl)) - .collect::>(); - - let mut failed = 0; - - loop { - tokio::select! { - _ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"), - res = downloads.next() => { - match res { - Some(Ok(())) => {}, - Some(Err(e)) => { - warn!("Downloading remote layer for compaction failed: {e:#}"); - failed += 1; - } - None => break, - } - } - } - } - - if failed != 0 { - anyhow::bail!("{failed} out of {total} layers failed to download, retrying later"); - } - - // if everything downloaded fine, lets try again - } - - unreachable!("retry loop exits") - } - - /// Compaction which might need to be retried after downloading remote layers. - async fn compact_inner(self: &Arc, ctx: &RequestContext) -> Result<(), CompactionError> { - // // High level strategy for compaction / image creation: // // 1. First, calculate the desired "partitioning" of the @@ -795,6 +712,7 @@ impl Timeline { // Define partitioning schema if needed + // FIXME: the match should only cover repartitioning, not the next steps match self .repartition( self.get_last_record_lsn(), @@ -2876,28 +2794,18 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { - new_layers: Vec>, - deltas_to_compact: Vec>, + new_layers: Vec, + deltas_to_compact: Vec>, } /// Top-level failure to compact. -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] enum CompactionError { - /// L0 compaction requires layers to be downloaded. - /// - /// This should not happen repeatedly, but will be retried once by top-level - /// `Timeline::compact`. - DownloadRequired(Vec>), - /// The timeline or pageserver is shutting down + #[error("The timeline or pageserver is shutting down")] ShuttingDown, /// Compaction cannot be done right now; page reconstruction and so on. - Other(anyhow::Error), -} - -impl From for CompactionError { - fn from(value: anyhow::Error) -> Self { - CompactionError::Other(value) - } + #[error(transparent)] + Other(#[from] anyhow::Error), } #[serde_as] @@ -3022,13 +2930,11 @@ impl TryFrom for CompactLevel0Phase1Stats { } impl Timeline { - /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment. + /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment. /// /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the /// start of level0 files compaction, the on-demand download should be revisited as well. - /// - /// [`compact_inner`]: Self::compact_inner async fn compact_level0_phase1( self: &Arc, _layer_removal_cs: Arc>, @@ -3073,19 +2979,32 @@ impl Timeline { // size length. Compaction will likely create the same set of n files afterwards. // // This failpoint is a superset of both of the cases. - fail_point!("compact-level0-phase1-return-same", |_| { - println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint - Ok(CompactLevel0Phase1Result { - new_layers: level0_deltas - .iter() - .map(|x| x.clone().downcast_delta_layer().unwrap()) - .collect(), - deltas_to_compact: level0_deltas - .iter() - .map(|x| x.layer_desc().clone().into()) - .collect(), - }) - }); + if cfg!(feature = "testing") { + // FIXME: utils does not depend on `fail` so there's no non-macro answer to this + let active = (|| { + ::fail::fail_point!("compact-level0-phase1-return-same", |_| true); + false + })(); + + if active { + let mut new_layers = Vec::with_capacity(level0_deltas.len()); + for delta in &level0_deltas { + // we are just faking these layers as being produced again for this failpoint + new_layers.push( + delta + .guard_against_eviction(true) + .await + .context("download layer for failpoint")?, + ); + } + // FIXME: println + println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint + return Ok(CompactLevel0Phase1Result { + new_layers, + deltas_to_compact: level0_deltas, + }); + } + } // Gather the files to compact in this iteration. // @@ -3105,14 +3024,15 @@ impl Timeline { let first_level0_delta = level0_deltas_iter.next().unwrap(); let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end; - let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)]; + let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len()); + deltas_to_compact.push(first_level0_delta.clone()); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; } - deltas_to_compact.push(Arc::clone(l)); + deltas_to_compact.push(l.clone()); prev_lsn_end = lsn_range.end; } let lsn_range = Range { @@ -3125,24 +3045,6 @@ impl Timeline { end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end, }; - let remotes = deltas_to_compact - .iter() - .filter(|l| l.is_remote_layer()) - .inspect(|l| info!("compact requires download of {l}")) - .map(|l| { - l.clone() - .downcast_remote_layer() - .expect("just checked it is remote layer") - }) - .collect::>(); - - if !remotes.is_empty() { - // caller is holding the lock to layer_removal_cs, and we don't want to download while - // holding that; in future download_remote_layer might take it as well. this is - // regardless of earlier image creation downloading on-demand, while holding the lock. - return Err(CompactionError::DownloadRequired(remotes)); - } - info!( "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)", lsn_range.start, @@ -3175,23 +3077,19 @@ impl Timeline { let mut all_keys = Vec::new(); - let downcast_deltas: Vec<_> = deltas_to_compact - .iter() - .map(|l| l.clone().downcast_delta_layer().expect("delta layer")) - .collect(); - for dl in downcast_deltas.iter() { - // TODO: replace this with an await once we fully go async - all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?); + for l in deltas_to_compact.iter() { + all_keys.extend(l.load_keys(ctx).await?); } + // FIXME: should spawn_blocking the rest of this function + // The current stdlib sorting implementation is designed in a way where it is // particularly fast where the slice is made up of sorted sub-ranges. all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn)); stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now(); - for DeltaEntry { key: next_key, .. } in all_keys.iter() { - let next_key = *next_key; + for &DeltaEntry { key: next_key, .. } in all_keys.iter() { if let Some(prev_key) = prev { // just first fast filter if next_key.to_i128() - prev_key.to_i128() >= min_hole_range { @@ -3351,9 +3249,12 @@ impl Timeline { || contains_hole { // ... if so, flush previous layer and prepare to write new one - new_layers.push(Arc::new( - writer.take().unwrap().finish(prev_key.unwrap().next())?, - )); + new_layers.push( + writer + .take() + .unwrap() + .finish(prev_key.unwrap().next(), self)?, + ); writer = None; if contains_hole { @@ -3393,7 +3294,7 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); + new_layers.push(writer.finish(prev_key.unwrap().next(), &self)?); } // Sync layers @@ -3411,10 +3312,17 @@ impl Timeline { ); } } - let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); + + // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here + let mut layer_paths: Vec = new_layers + .iter() + .map(|l| l.local_path().to_owned()) + .collect(); // Fsync all the layer files and directory using multiple threads to // minimize latency. + // + // FIXME: spawn_blocking above for this par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?; par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)]) @@ -3443,10 +3351,7 @@ impl Timeline { Ok(CompactLevel0Phase1Result { new_layers, - deltas_to_compact: deltas_to_compact - .into_iter() - .map(|x| Arc::new(x.layer_desc().clone())) - .collect(), + deltas_to_compact, }) } @@ -3507,7 +3412,6 @@ impl Timeline { } let mut guard = self.layers.write().await; - let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction. // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for @@ -3515,63 +3419,45 @@ impl Timeline { let mut duplicated_layers = HashSet::new(); let mut insert_layers = Vec::new(); - let mut remove_layers = Vec::new(); for l in new_layers { - let new_delta_path = l.path(); - - let metadata = new_delta_path.metadata().with_context(|| { - format!( - "read file metadata for new created layer {}", - new_delta_path.display() - ) - })?; - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_upload( - &l.filename(), - &LayerFileMetadata::new(metadata.len()), - )?; + let m = LayerFileMetadata::new(l.layer_desc().file_size); + // upload even if duplicated, because we may have changed the contents + remote_client.schedule_layer_file_upload(l.clone(), &m)?; } - - // update the timeline's physical size - self.metrics - .resident_physical_size_gauge - .add(metadata.len()); - - new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); - l.access_stats().record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - let l = l as Arc; - if guard.contains(&l) { + if guard.contains(l.as_ref()) { duplicated_layers.insert(l.layer_desc().key()); + } else if LayerMap::is_l0(l.layer_desc()) { + return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction."))); } else { - if LayerMap::is_l0(l.layer_desc()) { - return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction."))); - } + // update the timeline's physical size + self.metrics + .resident_physical_size_gauge + .add(l.layer_desc().file_size); insert_layers.push(l); } } + let remove_layers = { + let mut deltas_to_compact = deltas_to_compact; + // only remove those inputs which were not outputs + deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key())); + deltas_to_compact + }; + // Now that we have reshuffled the data to set of new delta layers, we can // delete the old ones - let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); - for ldesc in deltas_to_compact { - if duplicated_layers.contains(&ldesc.key()) { - // skip duplicated layers, they will not be removed; we have already overwritten them - // with new layers in the compaction phase 1. - continue; - } - layer_names_to_delete.push(ldesc.filename()); - remove_layers.push(guard.get_from_desc(&ldesc)); + let mut layer_names_to_delete = Vec::with_capacity(remove_layers.len()); + + for delta in &remove_layers { + layer_names_to_delete.push(delta.layer_desc().filename()); } guard.finish_compact_l0( &layer_removal_cs, remove_layers, - insert_layers, + &insert_layers, &self.metrics, )?; @@ -3579,6 +3465,7 @@ impl Timeline { // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { + // FIXME: this needs to be moved to LayerE::drop possibly? remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; }