add tiered compaction skeleton

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2023-06-20 14:43:07 -04:00
parent 9b7747436c
commit 5274f487e4
2 changed files with 362 additions and 26 deletions

View File

@@ -93,6 +93,11 @@ pub struct LayerMap {
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// All sorted runs. For tiered compaction.
pub sorted_runs: Vec<(usize, Vec<Arc<PersistentLayerDesc>>)>,
next_tier_id: usize,
}
/// The primary update API for the layer map.
@@ -114,15 +119,34 @@ impl BatchedUpdates<'_> {
///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
unimplemented!("insert_historic");
}
pub fn insert_historic_new(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc)
}
/// Get a reference to the current sorted runs.
pub fn sorted_runs(&mut self) -> &mut Vec<(usize, Vec<Arc<PersistentLayerDesc>>)> {
&mut self.layer_map.sorted_runs
}
pub fn next_tier_id(&mut self) -> usize {
let ret = self.layer_map.next_tier_id;
self.layer_map.next_tier_id += 1;
ret
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
unimplemented!("remove_historic");
}
pub fn remove_historic_new(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc)
}
@@ -644,6 +668,22 @@ impl LayerMap {
for layer in self.iter_historic_layers() {
layer.dump(verbose, ctx)?;
}
println!("tiered compaction:");
println!("l0_deltas:");
for layer in &self.l0_delta_layers {
layer.dump(verbose, ctx)?;
}
println!("sorted_runs:");
for (lvl, (tier_id, layer)) in self.sorted_runs.iter().enumerate() {
println!("tier {}", tier_id);
for layer in layer {
layer.dump(verbose, ctx)?;
}
}
println!("End dump LayerMap");
Ok(())
}

View File

@@ -857,18 +857,6 @@ impl Timeline {
.await
{
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self
.create_image_layers(&partitioning, lsn, false, ctx)
.await
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
for (path, layer_metadata) in layer_paths_to_upload {
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
}
}
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
@@ -1801,7 +1789,7 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer.layer_desc().clone());
updates.remove_historic_new(local_layer.layer_desc().clone());
self.lcache.remove_local_when_init(local_layer);
// fall-through to adding the remote layer
}
@@ -2299,9 +2287,20 @@ impl Timeline {
None
}
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: DeleteGuard,
layer: Arc<PersistentLayerDesc>,
_updates: &mut BatchedUpdates<'_>,
) -> anyhow::Result<()> {
warn!("not deleting the layer {layer:?} as old GC is not supposed to run");
return Ok(());
}
/// Removes the layer from local FS (if present) and from memory.
/// Remote storage is not affected by this operation.
fn delete_historic_layer(
fn delete_historic_layer_new(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: DeleteGuard,
@@ -2322,7 +2321,7 @@ impl Timeline {
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer.layer_desc().clone());
updates.remove_historic_new(layer.layer_desc().clone());
self.lcache.delete_layer(layer);
Ok(())
@@ -2911,7 +2910,7 @@ impl Timeline {
let (partitioning, _lsn) = self
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
.await?;
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
self.create_bottom_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?
} else {
#[cfg(test)]
@@ -2927,7 +2926,7 @@ impl Timeline {
}
}
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
let (delta_path, metadata) = self.create_l0_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
};
@@ -3030,7 +3029,7 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
async fn create_delta_layer(
async fn create_l0_delta_layer(
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
@@ -3081,7 +3080,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l.layer_desc().clone());
batch_updates.insert_historic_new(l.layer_desc().clone());
self.lcache.create_new_layer(l);
batch_updates.flush();
@@ -3204,7 +3203,7 @@ impl Timeline {
Ok(false)
}
async fn create_image_layers(
async fn create_bottom_image_layers(
&self,
partitioning: &KeyPartitioning,
lsn: Lsn,
@@ -3315,7 +3314,7 @@ impl Timeline {
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
let mut sorted_run = vec![];
for l in image_layers {
let path = l.filename();
let metadata = timeline_path
@@ -3334,9 +3333,16 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l.layer_desc().clone());
updates.insert_historic_new(l.layer_desc().clone());
sorted_run.push(Arc::new(l.layer_desc().clone()));
self.lcache.create_new_layer(l);
}
// add this layer to the end of all sorted runs; this is only done when initializing with init_lsn
// for now, and therefore the sorted runs are empty.
assert!(updates.sorted_runs().is_empty());
let tier_id = updates.next_tier_id();
updates.sorted_runs().push((tier_id, sorted_run));
updates.flush();
drop_wlock(guard);
timer.stop_and_record();
@@ -3351,6 +3357,13 @@ struct CompactLevel0Phase1Result {
deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
}
#[derive(Default)]
struct CompactTieredPhase1Result {
new_layers: Vec<DeltaLayer>,
new_tier_at: usize,
removed_tiers: Vec<usize>,
}
/// Top-level failure to compact.
#[derive(Debug)]
enum CompactionError {
@@ -3735,8 +3748,8 @@ impl Timeline {
.await?;
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
return Ok(());
// If L0 does not need to be compacted, look into other layers
return self.compact_tiered(layer_removal_cs, target_file_size, ctx).await;
}
// Before deleting any layers, we need to wait for their upload ops to finish.
@@ -3783,7 +3796,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x.layer_desc().clone());
updates.insert_historic_new(x.layer_desc().clone());
self.lcache.create_new_layer(x);
}
@@ -3792,7 +3805,7 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
self.delete_historic_layer_new(layer_removal_cs.clone(), l, &mut updates)?;
}
updates.flush();
drop_wlock(guard);
@@ -3805,6 +3818,289 @@ impl Timeline {
Ok(())
}
async fn compact_tiered_phase1(
&self,
_layer_removal_cs: DeleteGuard,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<Option<CompactTieredPhase1Result>, CompactionError> {
let guard = self.layers.read().await;
let (layers, _) = &*guard;
// Only compact if enough layers have accumulated.
let threshold = 8;
if layers.sorted_runs.len() < threshold {
debug!(
level0_deltas = layers.sorted_runs.len(),
threshold, "too few sorted runs to compact"
);
return Ok(None);
}
// Gather the files to compact in this iteration.
// TODO: leverage the properties that some layers do not overlap, kmerge is too costly
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
deltas_to_compact_layers.iter().map(|l| l.iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
true
}
})
},
)?;
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results(
deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
})
},
)?;
// TODO(chi): support image layer generation
// TODO(chi): merge with compact l0
let mut new_layers = Vec::new();
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
for x in all_values_iter {
let (key, lsn, value) = x?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
let mut next_key_size = 0u64;
let is_dup_layer = dup_end_lsn.is_valid();
dup_start_lsn = Lsn::INVALID;
if !same_key {
dup_end_lsn = Lsn::INVALID;
}
// Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
next_key_size = next_size;
if key != next_key {
if dup_end_lsn.is_valid() {
// We are writting segment with duplicates:
// place all remaining values of this key in separate segment
dup_start_lsn = dup_end_lsn; // new segments starts where old stops
dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
}
break;
}
key_values_total_size += next_size;
// Check if it is time to split segment: if total keys size is larger than target file size.
// We need to avoid generation of empty segments if next_size > target_file_size.
if key_values_total_size > target_file_size && lsn != next_lsn {
// Split key between multiple layers: such layer can contain only single key
dup_start_lsn = if dup_end_lsn.is_valid() {
dup_end_lsn // new segment with duplicates starts where old one stops
} else {
lsn // start with the first LSN for this key
};
dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
break;
}
}
// handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow or contains hole...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)?);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into())
});
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
.context("fsync of timeline dir")?;
layer_paths.pop().unwrap();
}
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
Ok(None)
}
///
/// Tiered Compaction for level > 0
async fn compact_tiered(
self: &Arc<Self>,
layer_removal_cs: DeleteGuard,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let Some(CompactTieredPhase1Result {
new_layers,
new_tier_at,
removed_tiers
}) = self
.compact_tiered_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await? else { return Ok(()); };
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update();
// TODO: need manifest to ensure correctness
let mut new_sorted_runs = Vec::new();
let mut new_tier_at_index = None;
let mut layers_to_delete = vec![];
let mut layer_names_to_delete = vec![];
for (tier_id, tier) in updates.sorted_runs() {
if *tier_id == new_tier_at {
new_tier_at_index = Some(new_sorted_runs.len());
}
if !removed_tiers.contains(tier_id) {
new_sorted_runs.push((*tier_id, tier.clone()));
} else {
for layer in tier {
layers_to_delete.push(layer.clone());
}
}
}
for layer in layers_to_delete {
layer_names_to_delete.push(layer.filename());
self.delete_historic_layer_new(layer_removal_cs.clone(), layer, &mut updates)?;
}
let new_tier_at_index = new_tier_at_index.unwrap();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
let mut new_layer_descs = vec![];
for l in new_layers {
let new_path = l.path();
let metadata = new_path.metadata().with_context(|| {
format!(
"read file metadata for new created layer {}",
new_path.display()
)
})?;
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&l.filename(),
&LayerFileMetadata::new(metadata.len()),
)?;
}
// update the timeline's physical size
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
new_layer_paths.insert(new_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic_new(x.layer_desc().clone());
new_layer_descs.push(x.layer_desc().clone().into());
self.lcache.create_new_layer(x);
}
let new_tier_id = updates.next_tier_id();
new_sorted_runs.insert(new_tier_at_index, (new_tier_id, new_layer_descs));
*updates.sorted_runs() = new_sorted_runs;
updates.flush();
drop_wlock(guard);
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
Ok(())
}
/// Update information about which layer files need to be retained on
/// garbage collection. This is separate from actually performing the GC,
/// and is updated more frequently, so that compaction can remove obsolete