compaction PoC: subcompaction (#4656)

This PR adds subcompaction support for compaction PoC. For compaction
job >= 4GB, it will be split into 4 threads.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2023-07-07 12:13:44 -04:00
committed by GitHub
parent 7e7cdaa3eb
commit f31cc2394b
4 changed files with 361 additions and 234 deletions

View File

@@ -53,6 +53,24 @@ pub enum StorageTimeOperation {
CreateTenant,
}
pub static NUM_TIERS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_storage_tiers_num",
"Number of sorted runs",
&["tenant_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub static NUM_COMPACTIONS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_storage_compaction_num",
"Number of ongoing compactions",
&["tenant_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub static STORAGE_PHYSICAL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_storage_physical_size_sum",
@@ -784,6 +802,8 @@ pub struct TimelineMetrics {
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
pub num_tiers: IntGauge,
pub num_compactions: IntGauge,
}
impl TimelineMetrics {
@@ -849,6 +869,12 @@ impl TimelineMetrics {
.unwrap();
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
let num_tiers = NUM_TIERS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let num_compactions = NUM_COMPACTIONS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
TimelineMetrics {
tenant_id,
@@ -875,6 +901,8 @@ impl TimelineMetrics {
evictions_with_low_residence_duration,
),
read_num_fs_layers,
num_tiers,
num_compactions,
}
}
}

View File

@@ -150,8 +150,8 @@ impl Layer for InMemoryLayer {
.unwrap_or_default();
println!(
"----- in-memory layer for tli {} LSNs {}-{} ----",
self.timeline_id, self.start_lsn, end_str,
"----- in-memory layer LSNs {}-{} ----",
self.start_lsn, end_str,
);
if !verbose {

View File

@@ -173,9 +173,7 @@ impl PersistentLayerDesc {
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----",
self.tenant_id,
self.timeline_id,
"----- layer for keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----",
self.key_range.start,
self.key_range.end,
self.lsn_range.start,

View File

@@ -504,6 +504,16 @@ pub enum LogicalSizeCalculationCause {
TenantSizeHandler,
}
fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
/// a contains b
/// ---- a -----
/// -- b --
fn contains<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
b.start >= a.start && b.end <= a.end
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -3155,6 +3165,10 @@ impl Timeline {
self.lcache.create_new_layer(l);
batch_updates.flush();
self.metrics
.num_tiers
.set(layers.sorted_runs.num_of_tiers() as i64);
// update the timeline's physical size
self.metrics.resident_physical_size_gauge.add(sz);
// update metrics
@@ -3816,6 +3830,9 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
self.metrics.num_compactions.inc();
scopeguard::defer!(self.metrics.num_compactions.dec());
if ENABLE_TIERED_COMPACTION {
return self
.compact_tiered(layer_removal_cs, target_file_size, ctx)
@@ -4009,7 +4026,7 @@ impl Timeline {
}
async fn compact_tiered_phase1(
&self,
self: &Arc<Self>,
_layer_removal_cs: DeleteGuardRead,
target_file_size: u64,
ctx: &RequestContext,
@@ -4079,7 +4096,7 @@ impl Timeline {
// compute deltas that can be trivially moved
let mut deltas_to_compact_layers = vec![];
let mut trivial_move_layers = vec![];
for (idx, (image_layers, delta_layers)) in layers_in_tier.into_iter().enumerate() {
for (idx, (_, delta_layers)) in layers_in_tier.iter().enumerate() {
let range_to_check = {
let start = layers_range
.iter()
@@ -4097,31 +4114,56 @@ impl Timeline {
.unwrap();
start..end
};
fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
/// a contains b
/// ---- a -----
/// -- b --
fn contains<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
b.start >= a.start && b.end <= a.end
}
for layer in delta_layers.into_iter() {
if overlaps_with(&range_to_check, &layer.get_key_range()) {
if overlaps_with(&range_to_check, &layer.key_range) {
// compact if overlaps
deltas_to_compact_layers.push(layer);
deltas_to_compact_layers.push(layer.clone());
} else {
// if delta layer does not overlap, trivial move
trivial_move_layers.push(layer);
trivial_move_layers.push(layer.clone());
}
}
for layer in image_layers.into_iter() {
if contains(&range_to_check, &layer.get_key_range()) {
}
// delta no overlap, directly return
if deltas_to_compact_layers.is_empty() {
for (idx, (image_layers, _)) in layers_in_tier.iter().enumerate() {
trivial_move_layers.extend(image_layers.iter().cloned());
}
return Ok(Some(CompactTieredPhase1Result {
new_layers: vec![],
new_tier_at: *tier_to_compact.last().unwrap(),
removed_tiers: tier_to_compact,
trivial_move_layers: trivial_move_layers
.iter()
.map(|x| self.lcache.get_from_desc(x))
.collect_vec(),
}));
}
// otherwise, find containing image layers
let compacting_key_range = {
let start = deltas_to_compact_layers
.iter()
.map(|x| x.key_range.start)
.min()
.unwrap();
let end = deltas_to_compact_layers
.iter()
.map(|x| x.key_range.end)
.max()
.unwrap();
start..end
};
for (idx, (image_layers, _)) in layers_in_tier.iter().enumerate() {
for layer in image_layers {
if contains(&compacting_key_range, &layer.key_range) {
// if image layer is within compaction range, remove it
deltas_to_compact_layers.push(layer);
deltas_to_compact_layers.push(layer.clone());
} else {
// otherwise, trivially move
trivial_move_layers.push(layer);
trivial_move_layers.push(layer.clone());
}
}
}
@@ -4129,7 +4171,6 @@ impl Timeline {
let deltas_to_compact_layers = deltas_to_compact_layers
.into_iter()
.map(|l| self.lcache.get_from_desc(&l))
.filter(|l| l.layer_desc().is_delta())
.collect_vec();
let lsn_range = {
@@ -4164,221 +4205,279 @@ impl Timeline {
)
};
if deltas_to_compact_layers.is_empty() {
return Ok(Some(CompactTieredPhase1Result {
new_layers: vec![],
new_tier_at: *tier_to_compact.last().unwrap(),
removed_tiers: tier_to_compact,
trivial_move_layers,
}));
async fn run_compaction_for_range(
tl: Arc<Timeline>,
deltas_to_compact_layers: Vec<Arc<dyn PersistentLayer>>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
lsn_range: Range<Lsn>,
target_file_size: u64,
) -> Result<Vec<Arc<dyn PersistentLayer>>, CompactionError> {
let deltas_to_compact_layers = deltas_to_compact_layers
.into_iter()
.filter(|x| x.layer_desc().is_delta())
.collect_vec();
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
deltas_to_compact_layers.iter().map(|l| l.iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn < b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
true
}
})
},
)?;
// TODO(chi): support image layer generation
// TODO(chi): merge with compact l0
let mut new_layers: Vec<Arc<dyn PersistentLayer>> = Vec::new();
let mut prev_key: Option<Key> = None;
let mut prev_image_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut image_writer: Option<ImageLayerWriter> = None;
let mut same_key_cnt = 0;
let mut construct_image_for_key = false;
let image_lsn = Lsn(lsn_range.end.0 - 1);
const PAGE_MATERIALIZE_THRESHOLD: usize = 16;
for x in all_values_iter {
let (key, lsn, value) = x?;
if let Some(ref key_range) = key_range {
if key < key_range.start {
continue;
}
if key >= key_range.end {
break;
}
}
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
if same_key {
same_key_cnt += 1;
} else {
same_key_cnt = 1;
construct_image_for_key = false;
}
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key {
// TODO: handle the case that updates to one page exceeds one layer file target size, as in
// L0 compaction.
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow...
if written_size > target_file_size {
// ... if so, flush previous layer and prepare to write new one
new_layers.push(Arc::new(
writer.take().unwrap().finish(prev_key.unwrap().next())?,
));
// only write image layer when we end a delta layer
if image_writer.is_some() {
let image_writer_mut = image_writer.as_mut().unwrap();
let written_size: u64 = image_writer_mut.size();
if written_size > target_file_size {
new_layers.push(Arc::new(
image_writer
.take()
.unwrap()
.finish(prev_image_key.unwrap().next())?,
));
image_writer = None; // this is redundant
}
}
writer = None; // this is redundant
}
}
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
tl.conf,
tl.timeline_id,
tl.tenant_id,
key,
lsn_range.clone(),
)?);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into())
});
if same_key_cnt >= PAGE_MATERIALIZE_THRESHOLD {
assert!(lsn <= image_lsn);
if !construct_image_for_key {
let img = match tl.get(key, image_lsn, ctx).await {
Ok(img) => img,
Err(err) => {
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CompactionError::Other(err.into()));
}
}
};
if image_writer.is_none() {
// Create writer if not initiaized yet
image_writer = Some(ImageLayerWriter::new(
tl.conf,
tl.timeline_id,
tl.tenant_id,
// TODO(chi): should not use the full key range
key.clone(),
image_lsn,
true,
)?);
}
let image_writer_mut = image_writer.as_mut().unwrap();
image_writer_mut.put_image(key, &img)?;
construct_image_for_key = true;
prev_image_key = Some(key);
}
}
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(Arc::new(writer.finish(prev_key.as_ref().unwrap().next())?));
}
if let Some(image_writer) = image_writer {
new_layers.push(Arc::new(image_writer.finish(prev_key.unwrap().next())?));
}
if let Some(ref key_range) = key_range {
let total_size = new_layers
.iter()
.map(|x| x.layer_desc().file_size())
.sum::<u64>();
info!(
"subcompaction completed for key range {}..{} with total size of {}",
key_range.start, key_range.end, total_size
);
}
Ok(new_layers)
}
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
deltas_to_compact_layers.iter().map(|l| l.iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn < b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
true
}
})
},
)?;
let total_size = deltas_to_compact_layers
.iter()
.map(|x| x.layer_desc().file_size())
.sum::<u64>();
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results(
deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn < b_lsn,
Ordering::Greater => false,
}
})
},
)?;
const SUBCOMPACTION_JOBS: usize = 4;
// TODO(chi): support image layer generation
// TODO(chi): merge with compact l0
let mut new_layers: Vec<Arc<dyn PersistentLayer>> = Vec::new();
let mut prev_key: Option<Key> = None;
let mut prev_image_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut image_writer: Option<ImageLayerWriter> = None;
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut same_key_cnt = 0;
let mut construct_image_for_key = false;
let image_lsn = Lsn(lsn_range.end.0 - 1);
const PAGE_MATERIALIZE_THRESHOLD: usize = 40;
for x in all_values_iter {
let (key, lsn, value) = x?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
if same_key {
same_key_cnt += 1;
} else {
same_key_cnt = 1;
construct_image_for_key = false;
}
if same_key_cnt >= PAGE_MATERIALIZE_THRESHOLD {
assert!(lsn <= image_lsn);
if !construct_image_for_key {
let img = match self.get(key, image_lsn, ctx).await {
Ok(img) => img,
Err(err) => {
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CompactionError::Other(err.into()));
}
}
};
if image_writer.is_none() {
// Create writer if not initiaized yet
image_writer = Some(ImageLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
// TODO(chi): should not use the full key range
key.clone(),
image_lsn,
true,
)?);
}
let image_writer_mut = image_writer.as_mut().unwrap();
image_writer_mut.put_image(key, &img)?;
construct_image_for_key = true;
prev_image_key = Some(key);
}
}
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
let mut next_key_size = 0u64;
let is_dup_layer = dup_end_lsn.is_valid();
dup_start_lsn = Lsn::INVALID;
if !same_key {
dup_end_lsn = Lsn::INVALID;
}
// Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
next_key_size = next_size;
if key != next_key {
if dup_end_lsn.is_valid() {
// We are writting segment with duplicates:
// place all remaining values of this key in separate segment
dup_start_lsn = dup_end_lsn; // new segments starts where old stops
dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
}
break;
}
key_values_total_size += next_size;
// Check if it is time to split segment: if total keys size is larger than target file size.
// We need to avoid generation of empty segments if next_size > target_file_size.
if key_values_total_size > target_file_size && lsn != next_lsn {
// Split key between multiple layers: such layer can contain only single key
dup_start_lsn = if dup_end_lsn.is_valid() {
dup_end_lsn // new segment with duplicates starts where old one stops
} else {
lsn // start with the first LSN for this key
};
dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
break;
}
}
// handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow or contains hole...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(Arc::new(
writer.take().unwrap().finish(prev_key.unwrap().next())?,
));
// only write image layer when we end a delta layer
if image_writer.is_some() {
let image_writer_mut = image_writer.as_mut().unwrap();
let written_size: u64 = image_writer_mut.size();
if written_size + key_values_total_size > target_file_size / 2 {
new_layers.push(Arc::new(
image_writer
.take()
.unwrap()
.finish(prev_image_key.unwrap().next())?,
));
image_writer = None; // this is redundant
}
}
writer = None; // this is redundant
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)?);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
Err(anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into())
let new_layers = if total_size > 16 * target_file_size {
// find split points
let mut deltas_to_compact_layers = deltas_to_compact_layers;
deltas_to_compact_layers.sort_by(|a, b| {
a.layer_desc()
.key_range
.start
.cmp(&b.layer_desc().key_range.start)
});
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(Arc::new(writer.finish(prev_key.as_ref().unwrap().next())?));
}
let size_per_partition = total_size / SUBCOMPACTION_JOBS as u64;
let mut current_size = 0;
let mut partitions = vec![];
partitions.push(deltas_to_compact_layers[0].layer_desc().key_range.start);
for layer in &deltas_to_compact_layers {
if current_size >= size_per_partition {
partitions.push(layer.layer_desc().key_range.start);
if partitions.len() >= SUBCOMPACTION_JOBS {
break;
}
current_size -= size_per_partition;
}
current_size += layer.layer_desc().file_size();
}
partitions.push(
deltas_to_compact_layers
.iter()
.map(|x| x.layer_desc().key_range.end)
.max()
.unwrap(),
);
if let Some(image_writer) = image_writer {
new_layers.push(Arc::new(image_writer.finish(prev_key.unwrap().next())?));
}
info!("subcompaction partitions: {}", partitions.iter().join(", "));
// pick overlapping delta files
let mut subcompaction_jobs = vec![];
for i in 1..partitions.len() {
let compact_range = partitions[i - 1]..partitions[i];
let mut sub_deltas_to_compact_layers = vec![];
for layer in &deltas_to_compact_layers {
if overlaps_with(&layer.layer_desc().key_range, &compact_range) {
sub_deltas_to_compact_layers.push(layer.clone());
}
}
subcompaction_jobs.push((compact_range, sub_deltas_to_compact_layers));
}
// parallel compaction
let mut handles = vec![];
for (id, (key_range, deltas_to_compact_layers)) in
subcompaction_jobs.into_iter().enumerate()
{
let ctx = ctx.clone();
let span = span!(Level::INFO, "subcompaction", %id);
let lsn_range = lsn_range.clone();
let this = self.clone();
handles.push(tokio::spawn(
async move {
run_compaction_for_range(
this,
deltas_to_compact_layers,
&ctx,
Some(key_range),
lsn_range,
target_file_size,
)
.await
}
.instrument(span),
));
}
let mut new_layers = vec![];
for handle in handles {
let result = handle.await.unwrap()?;
new_layers.extend(result);
}
new_layers
} else {
run_compaction_for_range(
self.clone(),
deltas_to_compact_layers,
ctx,
None,
lsn_range,
target_file_size,
)
.await?
};
// Sync layers
if !new_layers.is_empty() {
@@ -4395,8 +4494,6 @@ impl Timeline {
layer_paths.pop().unwrap();
}
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
Ok(Some(CompactTieredPhase1Result {
new_layers,
new_tier_at: *tier_to_compact.last().unwrap(),
@@ -4539,6 +4636,10 @@ impl Timeline {
"compaction complete, removed_tiers = {removed_tiers:?}, new_tier_at = {new_tier_at}"
);
self.metrics
.num_tiers
.set(layers.sorted_runs.num_of_tiers() as i64);
drop_wlock(guard);
// Also schedule the deletions in remote storage