diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f0a3ba532a..0a1c03c72e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -87,6 +87,8 @@ use super::storage_layer::{ DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, }; +const ENABLE_TIERED_COMPACTION: bool = false; + #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { NotStarted, @@ -857,6 +859,20 @@ impl Timeline { .await { Ok((partitioning, lsn)) => { + if !ENABLE_TIERED_COMPACTION { + // 2. Create new image layers for partitions that have been modified + // "enough". + let layer_paths_to_upload = self + .create_bottom_image_layers(&partitioning, lsn, false, ctx) + .await + .map_err(anyhow::Error::from)?; + if let Some(remote_client) = &self.remote_client { + for (path, layer_metadata) in layer_paths_to_upload { + remote_client.schedule_layer_file_upload(&path, &layer_metadata)?; + } + } + } + // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx) @@ -3742,8 +3758,88 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { - self.compact_tiered(layer_removal_cs, target_file_size, ctx) - .await + if ENABLE_TIERED_COMPACTION { + return self + .compact_tiered(layer_removal_cs, target_file_size, ctx) + .await; + } + + let CompactLevel0Phase1Result { + new_layers, + deltas_to_compact, + } = self + .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) + .await?; + + if new_layers.is_empty() && deltas_to_compact.is_empty() { + // nothing to do + return Ok(()); + } + + // Before deleting any layers, we need to wait for their upload ops to finish. + // See storage_sync module level comment on consistency. + // Do it here because we don't want to hold self.layers.write() while waiting. + if let Some(remote_client) = &self.remote_client { + debug!("waiting for upload ops to complete"); + remote_client + .wait_completion() + .await + .context("wait for layer upload ops to complete")?; + } + + let mut guard = self.layers.write().await; + let (layers, _) = &mut *guard; + let mut updates = layers.batch_update(); + let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + for l in new_layers { + let new_delta_path = l.path(); + + let metadata = new_delta_path.metadata().with_context(|| { + format!( + "read file metadata for new created layer {}", + new_delta_path.display() + ) + })?; + + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_upload( + &l.filename(), + &LayerFileMetadata::new(metadata.len()), + )?; + } + + // update the timeline's physical size + self.metrics + .resident_physical_size_gauge + .add(metadata.len()); + + new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); + let x: Arc = Arc::new(l); + x.access_stats().record_residence_event( + &updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + updates.insert_historic(x.layer_desc().clone()); + self.lcache.create_new_layer(x); + } + + // Now that we have reshuffled the data to set of new delta layers, we can + // delete the old ones + let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); + for l in deltas_to_compact { + layer_names_to_delete.push(l.filename()); + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; + } + updates.flush(); + drop_wlock(guard); + + // Also schedule the deletions in remote storage + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; + } + + Ok(()) } fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option> {