mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 16:40:38 +00:00
Tiered compaction: cut deltas along lsn as well if needed (#7671)
In general, tiered compaction is splitting delta layers along the key dimension, but this can only continue until a single key is reached: if the changes from a single key don't fit into one layer file, we used to create layer files of unbounded sizes. This patch implements the method listed as TODO/FIXME in the source code. It does the following things: * Make `accum_key_values` take the target size and if one key's modifications exceed it, make it fill `partition_lsns`, a vector of lsns to use for partitioning. * Have `retile_deltas` use that `partition_lsns` to create delta layers separated by lsn. * Adjust the `test_many_updates_for_single_key` to allow layer files below 0.5 the target size. This situation can create arbitarily small layer files: The amount of data is arbitrary that sits between having just cut a new delta, and then stumbling upon the key that needs to be split along lsn. This data will end up in a dedicated layer and it can be arbitrarily small. * Ignore single-key delta layers for depth calculation: in theory we might have only single-key delta layers in a tier, and this might confuse depth calculation as well, but this should be unlikely. Fixes #7243 Part of #7554 --------- Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
@@ -530,8 +530,6 @@ where
|
||||
// If we have accumulated only a narrow band of keyspace, create an
|
||||
// image layer. Otherwise write a delta layer.
|
||||
|
||||
// FIXME: deal with the case of lots of values for same key
|
||||
|
||||
// FIXME: we are ignoring images here. Did we already divide the work
|
||||
// so that we won't encounter them here?
|
||||
|
||||
@@ -550,39 +548,94 @@ where
|
||||
let mut new_jobs = Vec::new();
|
||||
|
||||
// Slide a window through the keyspace
|
||||
let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
|
||||
let mut key_accum =
|
||||
std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
|
||||
let mut all_in_window: bool = false;
|
||||
let mut window = Window::new();
|
||||
|
||||
// Helper function to create a job for a new delta layer with given key-lsn
|
||||
// rectangle.
|
||||
let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
|
||||
// The inputs for the job are all the input layers of the original job that
|
||||
// overlap with the rectangle.
|
||||
let batch_layers: Vec<LayerId> = job
|
||||
.input_layers
|
||||
.iter()
|
||||
.filter(|layer_id| {
|
||||
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
assert!(!batch_layers.is_empty());
|
||||
new_jobs.push(CompactionJob {
|
||||
key_range,
|
||||
lsn_range: lsn_range.clone(),
|
||||
strategy: CompactionStrategy::CreateDelta,
|
||||
input_layers: batch_layers,
|
||||
completed: false,
|
||||
});
|
||||
};
|
||||
|
||||
loop {
|
||||
if all_in_window && window.elems.is_empty() {
|
||||
if all_in_window && window.is_empty() {
|
||||
// All done!
|
||||
break;
|
||||
}
|
||||
|
||||
// If we now have enough keyspace for next delta layer in the window, create a
|
||||
// new delta layer
|
||||
if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
|
||||
{
|
||||
let batch_layers: Vec<LayerId> = job
|
||||
.input_layers
|
||||
.iter()
|
||||
.filter(|layer_id| {
|
||||
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
assert!(!batch_layers.is_empty());
|
||||
new_jobs.push(CompactionJob {
|
||||
key_range,
|
||||
lsn_range: job.lsn_range.clone(),
|
||||
strategy: CompactionStrategy::CreateDelta,
|
||||
input_layers: batch_layers,
|
||||
completed: false,
|
||||
});
|
||||
} else {
|
||||
assert!(!all_in_window);
|
||||
if let Some(next_key) = key_accum.next().await.transpose()? {
|
||||
window.feed(next_key.key, next_key.size);
|
||||
} else {
|
||||
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
|
||||
continue;
|
||||
}
|
||||
assert!(!all_in_window);
|
||||
|
||||
// Process next key in the key space
|
||||
match key_accum.next().await.transpose()? {
|
||||
None => {
|
||||
all_in_window = true;
|
||||
}
|
||||
Some(next_key) if next_key.partition_lsns.is_empty() => {
|
||||
// Normal case: extend the window by the key
|
||||
window.feed(next_key.key, next_key.size);
|
||||
}
|
||||
Some(next_key) => {
|
||||
// A key with too large size impact for a single delta layer. This
|
||||
// case occurs if you make a huge number of updates for a single key.
|
||||
//
|
||||
// Drain the window with has_more = false to make a clean cut before
|
||||
// the key, and then make dedicated delta layers for the single key.
|
||||
//
|
||||
// We cannot cluster the key with the others, because we don't want
|
||||
// layer files to overlap with each other in the lsn,key space (no
|
||||
// overlaps for the rectangles).
|
||||
let key = next_key.key;
|
||||
debug!("key {key} with size impact larger than the layer size");
|
||||
while !window.is_empty() {
|
||||
let has_more = false;
|
||||
let key_range = window.choose_next_delta(self.target_file_size, has_more)
|
||||
.expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
|
||||
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
|
||||
}
|
||||
|
||||
// Not really required: but here for future resilience:
|
||||
// We make a "gap" here, so any structure the window holds should
|
||||
// probably be reset.
|
||||
window = Window::new();
|
||||
|
||||
let mut prior_lsn = job.lsn_range.start;
|
||||
let mut lsn_ranges = Vec::new();
|
||||
for (lsn, _size) in next_key.partition_lsns.iter() {
|
||||
lsn_ranges.push(prior_lsn..*lsn);
|
||||
prior_lsn = *lsn;
|
||||
}
|
||||
lsn_ranges.push(prior_lsn..job.lsn_range.end);
|
||||
for lsn_range in lsn_ranges {
|
||||
let key_range = key..key.next();
|
||||
create_delta_job(key_range, &lsn_range, &mut new_jobs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -803,6 +856,10 @@ where
|
||||
self.elems.front().unwrap().accum_size - self.splitoff_size
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.elems.is_empty()
|
||||
}
|
||||
|
||||
fn commit_upto(&mut self, mut upto: usize) {
|
||||
while upto > 1 {
|
||||
let popped = self.elems.pop_front().unwrap();
|
||||
|
||||
@@ -235,9 +235,14 @@ pub struct KeySize<K> {
|
||||
pub key: K,
|
||||
pub num_values: u64,
|
||||
pub size: u64,
|
||||
/// The lsns to partition at (if empty then no per-lsn partitioning)
|
||||
pub partition_lsns: Vec<(Lsn, u64)>,
|
||||
}
|
||||
|
||||
pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
|
||||
pub fn accum_key_values<'a, I, K, D, E>(
|
||||
input: I,
|
||||
target_size: u64,
|
||||
) -> impl Stream<Item = Result<KeySize<K>, E>>
|
||||
where
|
||||
K: Eq + PartialOrd + Display + Copy,
|
||||
I: Stream<Item = Result<D, E>>,
|
||||
@@ -249,25 +254,35 @@ where
|
||||
|
||||
if let Some(first) = input.next().await {
|
||||
let first = first?;
|
||||
let mut part_size = first.size();
|
||||
let mut accum: KeySize<K> = KeySize {
|
||||
key: first.key(),
|
||||
num_values: 1,
|
||||
size: first.size(),
|
||||
size: part_size,
|
||||
partition_lsns: Vec::new(),
|
||||
};
|
||||
let mut last_key = accum.key;
|
||||
while let Some(this) = input.next().await {
|
||||
let this = this?;
|
||||
if this.key() == accum.key {
|
||||
accum.size += this.size();
|
||||
let add_size = this.size();
|
||||
if part_size + add_size > target_size {
|
||||
accum.partition_lsns.push((this.lsn(), part_size));
|
||||
part_size = 0;
|
||||
}
|
||||
part_size += add_size;
|
||||
accum.size += add_size;
|
||||
accum.num_values += 1;
|
||||
} else {
|
||||
assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
|
||||
last_key = accum.key;
|
||||
yield accum;
|
||||
part_size = this.size();
|
||||
accum = KeySize {
|
||||
key: this.key(),
|
||||
num_values: 1,
|
||||
size: this.size(),
|
||||
size: part_size,
|
||||
partition_lsns: Vec::new(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,6 +184,12 @@ impl<L> Level<L> {
|
||||
}
|
||||
let mut events: Vec<Event<K>> = Vec::new();
|
||||
for (idx, l) in self.layers.iter().enumerate() {
|
||||
let key_range = l.key_range();
|
||||
if key_range.end == key_range.start.next() && l.is_delta() {
|
||||
// Ignore single-key delta layers as they can be stacked on top of each other
|
||||
// as that is the only way to cut further.
|
||||
continue;
|
||||
}
|
||||
events.push(Event {
|
||||
key: l.key_range().start,
|
||||
layer_idx: idx,
|
||||
|
||||
@@ -20,10 +20,6 @@ pub(crate) fn setup_logging() {
|
||||
/// even if we produce an extremely narrow delta layer, spanning just that one
|
||||
/// key, we still too many records to fit in the target file size. We need to
|
||||
/// split in the LSN dimension too in that case.
|
||||
///
|
||||
/// TODO: The code to avoid this problem has not been implemented yet! So the
|
||||
/// assertion currently fails, but we need to make it not fail.
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn test_many_updates_for_single_key() {
|
||||
setup_logging();
|
||||
@@ -43,9 +39,9 @@ async fn test_many_updates_for_single_key() {
|
||||
}
|
||||
for l in executor.live_layers.iter() {
|
||||
assert!(l.file_size() < executor.target_file_size * 2);
|
||||
// sanity check that none of the delta layers are stupidly small either
|
||||
// Sanity check that none of the delta layers are empty either.
|
||||
if l.is_delta() {
|
||||
assert!(l.file_size() > executor.target_file_size / 2);
|
||||
assert!(l.file_size() > 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user