diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 276a005976..1173e8d8a0 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -93,6 +93,11 @@ pub struct LayerMap { /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. l0_delta_layers: Vec>, + + /// All sorted runs. For tiered compaction. + pub sorted_runs: Vec<(usize, Vec>)>, + + next_tier_id: usize, } /// The primary update API for the layer map. @@ -114,15 +119,34 @@ impl BatchedUpdates<'_> { /// // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) { + unimplemented!("insert_historic"); + } + + pub fn insert_historic_new(&mut self, layer_desc: PersistentLayerDesc) { self.layer_map.insert_historic_noflush(layer_desc) } + /// Get a reference to the current sorted runs. + pub fn sorted_runs(&mut self) -> &mut Vec<(usize, Vec>)> { + &mut self.layer_map.sorted_runs + } + + pub fn next_tier_id(&mut self) -> usize { + let ret = self.layer_map.next_tier_id; + self.layer_map.next_tier_id += 1; + ret + } + /// /// Remove an on-disk layer from the map. /// /// This should be called when the corresponding file on disk has been deleted. /// pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) { + unimplemented!("remove_historic"); + } + + pub fn remove_historic_new(&mut self, layer_desc: PersistentLayerDesc) { self.layer_map.remove_historic_noflush(layer_desc) } @@ -644,6 +668,22 @@ impl LayerMap { for layer in self.iter_historic_layers() { layer.dump(verbose, ctx)?; } + + println!("tiered compaction:"); + + println!("l0_deltas:"); + for layer in &self.l0_delta_layers { + layer.dump(verbose, ctx)?; + } + + println!("sorted_runs:"); + for (lvl, (tier_id, layer)) in self.sorted_runs.iter().enumerate() { + println!("tier {}", tier_id); + for layer in layer { + layer.dump(verbose, ctx)?; + } + } + println!("End dump LayerMap"); Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7ba5d218c1..69c3515649 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -857,18 +857,6 @@ impl Timeline { .await { Ok((partitioning, lsn)) => { - // 2. Create new image layers for partitions that have been modified - // "enough". - let layer_paths_to_upload = self - .create_image_layers(&partitioning, lsn, false, ctx) - .await - .map_err(anyhow::Error::from)?; - if let Some(remote_client) = &self.remote_client { - for (path, layer_metadata) in layer_paths_to_upload { - remote_client.schedule_layer_file_upload(&path, &layer_metadata)?; - } - } - // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx) @@ -1801,7 +1789,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer.layer_desc().clone()); + updates.remove_historic_new(local_layer.layer_desc().clone()); self.lcache.remove_local_when_init(local_layer); // fall-through to adding the remote layer } @@ -2299,9 +2287,20 @@ impl Timeline { None } + fn delete_historic_layer( + &self, + // we cannot remove layers otherwise, since gc and compaction will race + _layer_removal_cs: DeleteGuard, + layer: Arc, + _updates: &mut BatchedUpdates<'_>, + ) -> anyhow::Result<()> { + warn!("not deleting the layer {layer:?} as old GC is not supposed to run"); + return Ok(()); + } + /// Removes the layer from local FS (if present) and from memory. /// Remote storage is not affected by this operation. - fn delete_historic_layer( + fn delete_historic_layer_new( &self, // we cannot remove layers otherwise, since gc and compaction will race _layer_removal_cs: DeleteGuard, @@ -2322,7 +2321,7 @@ impl Timeline { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer.layer_desc().clone()); + updates.remove_historic_new(layer.layer_desc().clone()); self.lcache.delete_layer(layer); Ok(()) @@ -2911,7 +2910,7 @@ impl Timeline { let (partitioning, _lsn) = self .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx) .await?; - self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx) + self.create_bottom_image_layers(&partitioning, self.initdb_lsn, true, ctx) .await? } else { #[cfg(test)] @@ -2927,7 +2926,7 @@ impl Timeline { } } // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; + let (delta_path, metadata) = self.create_l0_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -3030,7 +3029,7 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - async fn create_delta_layer( + async fn create_l0_delta_layer( self: &Arc, frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { @@ -3081,7 +3080,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - batch_updates.insert_historic(l.layer_desc().clone()); + batch_updates.insert_historic_new(l.layer_desc().clone()); self.lcache.create_new_layer(l); batch_updates.flush(); @@ -3204,7 +3203,7 @@ impl Timeline { Ok(false) } - async fn create_image_layers( + async fn create_bottom_image_layers( &self, partitioning: &KeyPartitioning, lsn: Lsn, @@ -3315,7 +3314,7 @@ impl Timeline { let (layers, _) = &mut *guard; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); - + let mut sorted_run = vec![]; for l in image_layers { let path = l.filename(); let metadata = timeline_path @@ -3334,9 +3333,16 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(l.layer_desc().clone()); + updates.insert_historic_new(l.layer_desc().clone()); + sorted_run.push(Arc::new(l.layer_desc().clone())); self.lcache.create_new_layer(l); } + + // add this layer to the end of all sorted runs; this is only done when initializing with init_lsn + // for now, and therefore the sorted runs are empty. + assert!(updates.sorted_runs().is_empty()); + let tier_id = updates.next_tier_id(); + updates.sorted_runs().push((tier_id, sorted_run)); updates.flush(); drop_wlock(guard); timer.stop_and_record(); @@ -3351,6 +3357,13 @@ struct CompactLevel0Phase1Result { deltas_to_compact: Vec>, } +#[derive(Default)] +struct CompactTieredPhase1Result { + new_layers: Vec, + new_tier_at: usize, + removed_tiers: Vec, +} + /// Top-level failure to compact. #[derive(Debug)] enum CompactionError { @@ -3735,8 +3748,8 @@ impl Timeline { .await?; if new_layers.is_empty() && deltas_to_compact.is_empty() { - // nothing to do - return Ok(()); + // If L0 does not need to be compacted, look into other layers + return self.compact_tiered(layer_removal_cs, target_file_size, ctx).await; } // Before deleting any layers, we need to wait for their upload ops to finish. @@ -3783,7 +3796,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(x.layer_desc().clone()); + updates.insert_historic_new(x.layer_desc().clone()); self.lcache.create_new_layer(x); } @@ -3792,7 +3805,7 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; + self.delete_historic_layer_new(layer_removal_cs.clone(), l, &mut updates)?; } updates.flush(); drop_wlock(guard); @@ -3805,6 +3818,289 @@ impl Timeline { Ok(()) } + async fn compact_tiered_phase1( + &self, + _layer_removal_cs: DeleteGuard, + target_file_size: u64, + ctx: &RequestContext, + ) -> Result, CompactionError> { + let guard = self.layers.read().await; + let (layers, _) = &*guard; + + // Only compact if enough layers have accumulated. + let threshold = 8; + if layers.sorted_runs.len() < threshold { + debug!( + level0_deltas = layers.sorted_runs.len(), + threshold, "too few sorted runs to compact" + ); + return Ok(None); + } + + // Gather the files to compact in this iteration. + // TODO: leverage the properties that some layers do not overlap, kmerge is too costly + + // This iterator walks through all key-value pairs from all the layers + // we're compacting, in key, LSN order. + let all_values_iter = itertools::process_results( + deltas_to_compact_layers.iter().map(|l| l.iter(ctx)), + |iter_iter| { + iter_iter.kmerge_by(|a, b| { + if let Ok((a_key, a_lsn, _)) = a { + if let Ok((b_key, b_lsn, _)) = b { + match a_key.cmp(b_key) { + Ordering::Less => true, + Ordering::Equal => a_lsn <= b_lsn, + Ordering::Greater => false, + } + } else { + false + } + } else { + true + } + }) + }, + )?; + + // This iterator walks through all keys and is needed to calculate size used by each key + let mut all_keys_iter = itertools::process_results( + deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)), + |iter_iter| { + iter_iter.kmerge_by(|a, b| { + let (a_key, a_lsn, _) = a; + let (b_key, b_lsn, _) = b; + match a_key.cmp(b_key) { + Ordering::Less => true, + Ordering::Equal => a_lsn <= b_lsn, + Ordering::Greater => false, + } + }) + }, + )?; + + // TODO(chi): support image layer generation + + // TODO(chi): merge with compact l0 + + let mut new_layers = Vec::new(); + let mut prev_key: Option = None; + let mut writer: Option = None; + let mut key_values_total_size = 0u64; + let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key + let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key + for x in all_values_iter { + let (key, lsn, value) = x?; + let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + // We need to check key boundaries once we reach next key or end of layer with the same key + if !same_key || lsn == dup_end_lsn { + let mut next_key_size = 0u64; + let is_dup_layer = dup_end_lsn.is_valid(); + dup_start_lsn = Lsn::INVALID; + if !same_key { + dup_end_lsn = Lsn::INVALID; + } + // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size + for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { + next_key_size = next_size; + if key != next_key { + if dup_end_lsn.is_valid() { + // We are writting segment with duplicates: + // place all remaining values of this key in separate segment + dup_start_lsn = dup_end_lsn; // new segments starts where old stops + dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range + } + break; + } + key_values_total_size += next_size; + // Check if it is time to split segment: if total keys size is larger than target file size. + // We need to avoid generation of empty segments if next_size > target_file_size. + if key_values_total_size > target_file_size && lsn != next_lsn { + // Split key between multiple layers: such layer can contain only single key + dup_start_lsn = if dup_end_lsn.is_valid() { + dup_end_lsn // new segment with duplicates starts where old one stops + } else { + lsn // start with the first LSN for this key + }; + dup_end_lsn = next_lsn; // upper LSN boundary is exclusive + break; + } + } + // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. + if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { + dup_start_lsn = dup_end_lsn; + dup_end_lsn = lsn_range.end; + } + if writer.is_some() { + let written_size = writer.as_mut().unwrap().size(); + // check if key cause layer overflow or contains hole... + if is_dup_layer + || dup_end_lsn.is_valid() + || written_size + key_values_total_size > target_file_size + { + // ... if so, flush previous layer and prepare to write new one + new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); + writer = None; + } + } + // Remember size of key value because at next iteration we will access next item + key_values_total_size = next_key_size; + } + if writer.is_none() { + // Create writer if not initiaized yet + writer = Some(DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + key, + if dup_end_lsn.is_valid() { + // this is a layer containing slice of values of the same key + debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); + dup_start_lsn..dup_end_lsn + } else { + debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); + lsn_range.clone() + }, + )?); + } + + fail_point!("delta-layer-writer-fail-before-finish", |_| { + Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into()) + }); + + writer.as_mut().unwrap().put_value(key, lsn, value)?; + prev_key = Some(key); + } + if let Some(writer) = writer { + new_layers.push(writer.finish(prev_key.unwrap().next())?); + } + + // Sync layers + if !new_layers.is_empty() { + let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); + + // Fsync all the layer files and directory using multiple threads to + // minimize latency. + par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?; + + par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)]) + .context("fsync of timeline dir")?; + + layer_paths.pop().unwrap(); + } + + drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + + Ok(None) + } + + /// + /// Tiered Compaction for level > 0 + async fn compact_tiered( + self: &Arc, + layer_removal_cs: DeleteGuard, + target_file_size: u64, + ctx: &RequestContext, + ) -> Result<(), CompactionError> { + let Some(CompactTieredPhase1Result { + new_layers, + new_tier_at, + removed_tiers + }) = self + .compact_tiered_phase1(layer_removal_cs.clone(), target_file_size, ctx) + .await? else { return Ok(()); }; + + // Before deleting any layers, we need to wait for their upload ops to finish. + // See storage_sync module level comment on consistency. + // Do it here because we don't want to hold self.layers.write() while waiting. + if let Some(remote_client) = &self.remote_client { + debug!("waiting for upload ops to complete"); + remote_client + .wait_completion() + .await + .context("wait for layer upload ops to complete")?; + } + + let mut guard = self.layers.write().await; + let (layers, _) = &mut *guard; + let mut updates = layers.batch_update(); + + // TODO: need manifest to ensure correctness + let mut new_sorted_runs = Vec::new(); + let mut new_tier_at_index = None; + let mut layers_to_delete = vec![]; + let mut layer_names_to_delete = vec![]; + for (tier_id, tier) in updates.sorted_runs() { + if *tier_id == new_tier_at { + new_tier_at_index = Some(new_sorted_runs.len()); + } + if !removed_tiers.contains(tier_id) { + new_sorted_runs.push((*tier_id, tier.clone())); + } else { + for layer in tier { + layers_to_delete.push(layer.clone()); + } + } + } + + for layer in layers_to_delete { + layer_names_to_delete.push(layer.filename()); + self.delete_historic_layer_new(layer_removal_cs.clone(), layer, &mut updates)?; + } + + let new_tier_at_index = new_tier_at_index.unwrap(); + + let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + let mut new_layer_descs = vec![]; + for l in new_layers { + let new_path = l.path(); + + let metadata = new_path.metadata().with_context(|| { + format!( + "read file metadata for new created layer {}", + new_path.display() + ) + })?; + + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_upload( + &l.filename(), + &LayerFileMetadata::new(metadata.len()), + )?; + } + + // update the timeline's physical size + self.metrics + .resident_physical_size_gauge + .add(metadata.len()); + + new_layer_paths.insert(new_path, LayerFileMetadata::new(metadata.len())); + let x: Arc = Arc::new(l); + x.access_stats().record_residence_event( + &updates, + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + updates.insert_historic_new(x.layer_desc().clone()); + new_layer_descs.push(x.layer_desc().clone().into()); + self.lcache.create_new_layer(x); + } + + let new_tier_id = updates.next_tier_id(); + new_sorted_runs.insert(new_tier_at_index, (new_tier_id, new_layer_descs)); + *updates.sorted_runs() = new_sorted_runs; + + updates.flush(); + drop_wlock(guard); + + // Also schedule the deletions in remote storage + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; + } + + Ok(()) + } + /// Update information about which layer files need to be retained on /// garbage collection. This is separate from actually performing the GC, /// and is updated more frequently, so that compaction can remove obsolete