perf(mito): Use a heap to merge batches for the same key (#2521)

* feat: merge by heap

* fix: fix heap order

* feat: avoid pop/push next and refactor some functions

* feat: replace merge_batches and fixe tests

* test: add test that a key is deleted

* fix: skip empty batch

* style: clippy

* chore: fix typos
This commit is contained in:
Yingwen
2023-10-07 10:56:08 +08:00
committed by GitHub
parent 00fe7d104e
commit fe783c7c1f
2 changed files with 328 additions and 64 deletions

View File

@@ -30,10 +30,13 @@ use datatypes::arrow;
use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{DataType, ScalarVector};
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
use datatypes::types::TimestampType;
use datatypes::value::ValueRef;
use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
@@ -355,6 +358,47 @@ impl Batch {
.collect()
}
/// Returns timestamps in a native slice or `None` if the batch is empty.
pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
if self.timestamps.is_empty() {
return None;
}
let values = match self.timestamps.data_type() {
ConcreteDataType::Timestamp(TimestampType::Second(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampSecondVector>()
.unwrap()
.as_arrow()
.values(),
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.as_arrow()
.values(),
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampMicrosecondVector>()
.unwrap()
.as_arrow()
.values(),
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self
.timestamps
.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.unwrap()
.as_arrow()
.values(),
other => panic!("timestamps in a Batch has other type {:?}", other),
};
Some(values)
}
/// Takes the batch in place.
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
@@ -392,7 +436,7 @@ impl Batch {
///
/// # Panics
/// Panics if `index` is out-of-bound or the sequence vector returns null.
fn get_sequence(&self, index: usize) -> SequenceNumber {
pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber {
// Safety: sequences is not null so it actually returns Some.
self.sequences.get_data(index).unwrap()
}
@@ -646,12 +690,13 @@ mod tests {
}
#[test]
fn test_first_last_empty() {
fn test_empty_batch() {
let batch = new_batch(&[], &[], &[], &[]);
assert_eq!(None, batch.first_timestamp());
assert_eq!(None, batch.last_timestamp());
assert_eq!(None, batch.first_sequence());
assert_eq!(None, batch.last_sequence());
assert!(batch.timestamps_native().is_none());
}
#[test]
@@ -707,6 +752,17 @@ mod tests {
assert_eq!(expect, batch);
}
#[test]
fn test_timestamps_native() {
let batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap());
}
#[test]
fn test_concat_empty() {
let err = Batch::concat(vec![]).unwrap_err();

View File

@@ -15,7 +15,7 @@
//! Merge reader implementation.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::collections::{BinaryHeap, VecDeque};
use std::mem;
use async_trait::async_trait;
@@ -37,32 +37,27 @@ pub struct MergeReader {
nodes: BinaryHeap<Node>,
/// Batches for the next primary key.
batch_merger: BatchMerger,
/// Sorted batches to output.
output: VecDeque<Batch>,
}
#[async_trait]
impl BatchReader for MergeReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
// Collect batches from sources for the same primary key and return
// the collected batch.
while !self.nodes.is_empty() {
// Peek current key.
let Some(current_key) = self.batch_merger.primary_key() else {
// The merger is empty, we could push it directly.
self.take_batch_from_heap().await?;
// Try next node.
continue;
};
// If next node has a different key, we have finish collecting current key.
// Safety: node is not empty.
if self.nodes.peek().unwrap().primary_key() != current_key {
break;
while !self.output.is_empty() || !self.nodes.is_empty() {
// Takes from sorted output if there are batches in it.
if let Some(batch) = self.output.pop_front() {
return Ok(Some(batch));
}
// They have the same primary key, we could take it and try next node.
self.take_batch_from_heap().await?;
// Collects batches to the merger.
self.collect_batches_to_merge().await?;
// Merge collected batches to output.
self.output = self.batch_merger.merge_batches()?;
}
// Merge collected batches.
self.batch_merger.merge_batches()
Ok(None)
}
}
@@ -81,9 +76,32 @@ impl MergeReader {
Ok(MergeReader {
nodes,
batch_merger: BatchMerger::new(),
output: VecDeque::new(),
})
}
/// Collect batches from sources for the same primary key.
async fn collect_batches_to_merge(&mut self) -> Result<()> {
while !self.nodes.is_empty() {
// Peek current key.
let Some(current_key) = self.batch_merger.primary_key() else {
// The merger is empty, we could push it directly.
self.take_batch_from_heap().await?;
// Try next node.
continue;
};
// If next node has a different key, we have finish collecting current key.
// Safety: node is not empty.
if self.nodes.peek().unwrap().primary_key() != current_key {
break;
}
// They have the same primary key, we could take it and try next node.
self.take_batch_from_heap().await?;
}
Ok(())
}
/// Takes batch from heap top and reheap.
async fn take_batch_from_heap(&mut self) -> Result<()> {
let mut next_node = self.nodes.pop().unwrap();
@@ -201,32 +219,143 @@ impl BatchMerger {
/// Merge all buffered batches and returns the merged batch. Then
/// reset the buffer.
fn merge_batches(&mut self) -> Result<Option<Batch>> {
fn merge_batches(&mut self) -> Result<VecDeque<Batch>> {
if self.batches.is_empty() {
return Ok(None);
return Ok(VecDeque::new());
}
let batches = mem::take(&mut self.batches);
// Concat all batches.
let mut batch = Batch::concat(batches)?;
let mut output = VecDeque::with_capacity(self.batches.len());
if self.is_sorted {
// Fast path. We can output batches directly.
for batch in self.batches.drain(..) {
output_batch(&mut output, batch)?;
}
// TODO(yingwen): metrics for sorted and unsorted batches.
if !self.is_sorted {
// Slow path. We need to merge overlapping batches. For simplicity, we
// just sort the all batches and remove duplications.
batch.sort_and_dedup()?;
// We don't need to remove duplications if timestamps of batches
// are not overlapping.
return Ok(output);
}
// Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter
// rows by sequence for simplicity and performance reason.
batch.filter_deleted()?;
// Reset merger.
// Slow path. We need to merge overlapping batches.
// Constructs a heap from batches. Batches in the heap is not empty, we need to check
// this before pushing a batch into the heap.
let mut heap = BinaryHeap::from_iter(self.batches.drain(..).map(CompareTimeSeq));
// Reset merger as sorted as we have cleared batches.
self.is_sorted = true;
Ok(Some(batch))
// Sorts batches.
while let Some(top) = heap.pop() {
let top = top.0;
let Some(next) = heap.peek() else {
// If there is no remaining batch, we can output the top-most batch.
output_batch(&mut output, top)?;
break;
};
let next = &next.0;
if top.last_timestamp() < next.first_timestamp() {
// If the top-most batch doesn't overlaps with the next batch, we can output it.
output_batch(&mut output, top)?;
continue;
}
// Safety: Batches (top, next) in the heap is not empty, so we can use unwrap here.
// Min timestamp in the next batch.
let next_min_ts = next.first_timestamp().unwrap();
let timestamps = top.timestamps_native().unwrap();
// Binary searches the timestamp in the top batch.
// Safety: Batches should have the same timestamp resolution so we can compare the native
// value directly.
match timestamps.binary_search(&next_min_ts.value()) {
Ok(pos) => {
// They have duplicate timestamps. Outputs non overlapping timestamps.
// Batch itself doesn't contain duplicate timestamps so timestamps before `pos`
// must be less than `next_min_ts`.
// It's possible to output a very small batch but concatenating small batches
// slows down the reader.
output_batch(&mut output, top.slice(0, pos))?;
// Removes duplicate timestamp and fixes the heap. Keeps the timestamp with largest
// sequence.
// Safety: pos is a valid index returned by `binary_search` and `sequences` are always
// not null.
if top.get_sequence(pos) > next.first_sequence().unwrap() {
// Safety: `next` is not None.
let next = heap.pop().unwrap().0;
// Keeps the timestamp in top and skips the first timestamp in the `next`
// batch.
push_remaining_to_heap(&mut heap, next, 1);
// Skips already outputted timestamps.
push_remaining_to_heap(&mut heap, top, pos);
} else {
// Keeps timestamp in next and skips the duplicated timestamp and already outputted
// timestamp in top.
push_remaining_to_heap(&mut heap, top, pos + 1);
}
}
Err(pos) => {
// No duplicate timestamp. Outputs timestamp before `pos`.
output_batch(&mut output, top.slice(0, pos))?;
push_remaining_to_heap(&mut heap, top, pos);
}
}
}
Ok(output)
}
}
/// Skips first `num_to_skip` rows from the batch and pushes remaining batch into the heap if the batch
/// is still not empty.
fn push_remaining_to_heap(heap: &mut BinaryHeap<CompareTimeSeq>, batch: Batch, num_to_skip: usize) {
debug_assert!(batch.num_rows() >= num_to_skip);
let remaining = batch.num_rows() - num_to_skip;
if remaining == 0 {
// Nothing remains.
return;
}
heap.push(CompareTimeSeq(batch.slice(num_to_skip, remaining)));
}
/// Removes deleted items from the `batch` and pushes it back to the `output` if
/// the `batch` is not empty.
fn output_batch(output: &mut VecDeque<Batch>, mut batch: Batch) -> Result<()> {
// Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter
// rows by sequence for simplicity and performance reason.
batch.filter_deleted()?;
if batch.is_empty() {
return Ok(());
}
output.push_back(batch);
Ok(())
}
/// Compare [Batch] by timestamp and sequence.
struct CompareTimeSeq(Batch);
impl PartialEq for CompareTimeSeq {
fn eq(&self, other: &Self) -> bool {
self.0.first_timestamp() == other.0.first_timestamp()
&& self.0.first_sequence() == other.0.first_sequence()
}
}
impl Eq for CompareTimeSeq {}
impl PartialOrd for CompareTimeSeq {
fn partial_cmp(&self, other: &CompareTimeSeq) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CompareTimeSeq {
/// Compares by first timestamp desc, first sequence. (The heap is a max heap).
fn cmp(&self, other: &CompareTimeSeq) -> Ordering {
self.0
.first_timestamp()
.cmp(&other.0.first_timestamp())
.then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
// We reverse the ordering as the heap is a max heap.
.reverse()
}
}
@@ -396,17 +525,19 @@ mod tests {
&[
new_batch(
b"k1",
&[1, 2, 4, 5, 7],
&[11, 12, 14, 15, 17],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[21, 22, 24, 25, 27],
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Put, OpType::Put],
&[24, 25],
),
new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]),
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
],
)
@@ -467,27 +598,63 @@ mod tests {
&[
new_batch(
b"k1",
&[1, 2, 3, 4],
&[11, 12, 10, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 33, 24],
),
new_batch(
b"k2",
&[1, 3, 10],
&[11, 13, 20],
&[OpType::Put, OpType::Put, OpType::Put],
&[21, 23, 30],
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_deleted() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Delete, OpType::Delete],
&[21, 22],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Delete, OpType::Delete],
&[24, 25],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23])],
)
.await;
}
#[test]
fn test_batch_merger_empty() {
let mut merger = BatchMerger::new();
assert!(merger.merge_batches().unwrap().is_none());
assert!(merger.merge_batches().unwrap().is_empty());
}
#[test]
@@ -509,7 +676,48 @@ mod tests {
&[22, 24],
));
assert!(!merger.is_sorted);
let batch = merger.merge_batches().unwrap().unwrap();
let batches = merger.merge_batches().unwrap();
let batch = Batch::concat(batches.into_iter().collect()).unwrap();
assert_eq!(
batch,
new_batch(
b"k1",
&[1, 2, 3, 4, 5],
&[10, 11, 10, 11, 10],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put
],
&[21, 22, 23, 24, 25]
)
);
assert!(merger.is_sorted);
}
#[test]
fn test_batch_merger_unsorted_by_heap() {
let mut merger = BatchMerger::new();
merger.push(new_batch(
b"k1",
&[1, 3, 5],
&[10, 10, 10],
&[OpType::Put, OpType::Put, OpType::Put],
&[21, 23, 25],
));
assert!(merger.is_sorted);
merger.push(new_batch(
b"k1",
&[2, 4],
&[11, 11],
&[OpType::Put, OpType::Put],
&[22, 24],
));
assert!(!merger.is_sorted);
let batches = merger.merge_batches().unwrap();
let batch = Batch::concat(batches.into_iter().collect()).unwrap();
assert_eq!(
batch,
new_batch(