Pevent creation of empty layers with duplicates (#2327)

* Pevent creation of empty layers with duplicates

* Add comments
This commit is contained in:
Konstantin Knizhnik
2022-09-01 21:51:48 +03:00
committed by GitHub
parent 15c5f3e6cf
commit 40813adba2

View File

@@ -1795,47 +1795,54 @@ impl Timeline {
if !same_key {
dup_end_lsn = Lsn::INVALID;
}
// Determine size occupied by this key. We stop at next key, or when size becomes larger than target_file_size
// Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
next_key_size = next_size;
if key != next_key {
if dup_end_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
// We are writting segment with duplicates:
// place all remaining values of this key in separate segment
dup_start_lsn = dup_end_lsn; // new segments starts where old stops
dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
}
break;
}
key_values_total_size += next_size;
if key_values_total_size > target_file_size {
// split key between multiple layers: such layer can contain only single key
// Check if it is time to split segment: if total keys size is larger than target file size.
// We need to avoid generation of empty segments if next_size > target_file_size.
if key_values_total_size > target_file_size && lsn != next_lsn {
// Split key between multiple layers: such layer can contain only single key
dup_start_lsn = if dup_end_lsn.is_valid() {
dup_end_lsn
dup_end_lsn // new segment with duplicates starts where old one stops
} else {
lsn
lsn // start with the first LSN for this key
};
dup_end_lsn = next_lsn;
dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
break;
}
}
// handle case when loop reaches last key
// handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow
// check if key cause layer overflow...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,