diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index c16639415e..7c7f055abf 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -30,10 +30,13 @@ use datatypes::arrow; use datatypes::arrow::array::{Array, ArrayRef}; use datatypes::arrow::compute::SortOptions; use datatypes::arrow::row::{RowConverter, SortField}; -use datatypes::prelude::{DataType, ScalarVector}; +use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; +use datatypes::types::TimestampType; use datatypes::value::ValueRef; use datatypes::vectors::{ - BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef, + BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector, + TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, + Vector, VectorRef, }; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; @@ -355,6 +358,47 @@ impl Batch { .collect() } + /// Returns timestamps in a native slice or `None` if the batch is empty. + pub(crate) fn timestamps_native(&self) -> Option<&[i64]> { + if self.timestamps.is_empty() { + return None; + } + + let values = match self.timestamps.data_type() { + ConcreteDataType::Timestamp(TimestampType::Second(_)) => self + .timestamps + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => self + .timestamps + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => self + .timestamps + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => self + .timestamps + .as_any() + .downcast_ref::() + .unwrap() + .as_arrow() + .values(), + other => panic!("timestamps in a Batch has other type {:?}", other), + }; + + Some(values) + } + /// Takes the batch in place. fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> { self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?; @@ -392,7 +436,7 @@ impl Batch { /// /// # Panics /// Panics if `index` is out-of-bound or the sequence vector returns null. - fn get_sequence(&self, index: usize) -> SequenceNumber { + pub(crate) fn get_sequence(&self, index: usize) -> SequenceNumber { // Safety: sequences is not null so it actually returns Some. self.sequences.get_data(index).unwrap() } @@ -646,12 +690,13 @@ mod tests { } #[test] - fn test_first_last_empty() { + fn test_empty_batch() { let batch = new_batch(&[], &[], &[], &[]); assert_eq!(None, batch.first_timestamp()); assert_eq!(None, batch.last_timestamp()); assert_eq!(None, batch.first_sequence()); assert_eq!(None, batch.last_sequence()); + assert!(batch.timestamps_native().is_none()); } #[test] @@ -707,6 +752,17 @@ mod tests { assert_eq!(expect, batch); } + #[test] + fn test_timestamps_native() { + let batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + assert_eq!(&[1, 2, 3, 4], batch.timestamps_native().unwrap()); + } + #[test] fn test_concat_empty() { let err = Batch::concat(vec![]).unwrap_err(); diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 797e2f9f70..b86eb8d698 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -15,7 +15,7 @@ //! Merge reader implementation. use std::cmp::Ordering; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, VecDeque}; use std::mem; use async_trait::async_trait; @@ -37,32 +37,27 @@ pub struct MergeReader { nodes: BinaryHeap, /// Batches for the next primary key. batch_merger: BatchMerger, + /// Sorted batches to output. + output: VecDeque, } #[async_trait] impl BatchReader for MergeReader { async fn next_batch(&mut self) -> Result> { - // Collect batches from sources for the same primary key and return - // the collected batch. - while !self.nodes.is_empty() { - // Peek current key. - let Some(current_key) = self.batch_merger.primary_key() else { - // The merger is empty, we could push it directly. - self.take_batch_from_heap().await?; - // Try next node. - continue; - }; - // If next node has a different key, we have finish collecting current key. - // Safety: node is not empty. - if self.nodes.peek().unwrap().primary_key() != current_key { - break; + while !self.output.is_empty() || !self.nodes.is_empty() { + // Takes from sorted output if there are batches in it. + if let Some(batch) = self.output.pop_front() { + return Ok(Some(batch)); } - // They have the same primary key, we could take it and try next node. - self.take_batch_from_heap().await?; + + // Collects batches to the merger. + self.collect_batches_to_merge().await?; + + // Merge collected batches to output. + self.output = self.batch_merger.merge_batches()?; } - // Merge collected batches. - self.batch_merger.merge_batches() + Ok(None) } } @@ -81,9 +76,32 @@ impl MergeReader { Ok(MergeReader { nodes, batch_merger: BatchMerger::new(), + output: VecDeque::new(), }) } + /// Collect batches from sources for the same primary key. + async fn collect_batches_to_merge(&mut self) -> Result<()> { + while !self.nodes.is_empty() { + // Peek current key. + let Some(current_key) = self.batch_merger.primary_key() else { + // The merger is empty, we could push it directly. + self.take_batch_from_heap().await?; + // Try next node. + continue; + }; + // If next node has a different key, we have finish collecting current key. + // Safety: node is not empty. + if self.nodes.peek().unwrap().primary_key() != current_key { + break; + } + // They have the same primary key, we could take it and try next node. + self.take_batch_from_heap().await?; + } + + Ok(()) + } + /// Takes batch from heap top and reheap. async fn take_batch_from_heap(&mut self) -> Result<()> { let mut next_node = self.nodes.pop().unwrap(); @@ -201,32 +219,143 @@ impl BatchMerger { /// Merge all buffered batches and returns the merged batch. Then /// reset the buffer. - fn merge_batches(&mut self) -> Result> { + fn merge_batches(&mut self) -> Result> { if self.batches.is_empty() { - return Ok(None); + return Ok(VecDeque::new()); } - let batches = mem::take(&mut self.batches); - // Concat all batches. - let mut batch = Batch::concat(batches)?; + let mut output = VecDeque::with_capacity(self.batches.len()); + if self.is_sorted { + // Fast path. We can output batches directly. + for batch in self.batches.drain(..) { + output_batch(&mut output, batch)?; + } - // TODO(yingwen): metrics for sorted and unsorted batches. - if !self.is_sorted { - // Slow path. We need to merge overlapping batches. For simplicity, we - // just sort the all batches and remove duplications. - batch.sort_and_dedup()?; - // We don't need to remove duplications if timestamps of batches - // are not overlapping. + return Ok(output); } - // Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter - // rows by sequence for simplicity and performance reason. - batch.filter_deleted()?; - - // Reset merger. + // Slow path. We need to merge overlapping batches. + // Constructs a heap from batches. Batches in the heap is not empty, we need to check + // this before pushing a batch into the heap. + let mut heap = BinaryHeap::from_iter(self.batches.drain(..).map(CompareTimeSeq)); + // Reset merger as sorted as we have cleared batches. self.is_sorted = true; - Ok(Some(batch)) + // Sorts batches. + while let Some(top) = heap.pop() { + let top = top.0; + let Some(next) = heap.peek() else { + // If there is no remaining batch, we can output the top-most batch. + output_batch(&mut output, top)?; + break; + }; + let next = &next.0; + + if top.last_timestamp() < next.first_timestamp() { + // If the top-most batch doesn't overlaps with the next batch, we can output it. + output_batch(&mut output, top)?; + continue; + } + + // Safety: Batches (top, next) in the heap is not empty, so we can use unwrap here. + // Min timestamp in the next batch. + let next_min_ts = next.first_timestamp().unwrap(); + let timestamps = top.timestamps_native().unwrap(); + // Binary searches the timestamp in the top batch. + // Safety: Batches should have the same timestamp resolution so we can compare the native + // value directly. + match timestamps.binary_search(&next_min_ts.value()) { + Ok(pos) => { + // They have duplicate timestamps. Outputs non overlapping timestamps. + // Batch itself doesn't contain duplicate timestamps so timestamps before `pos` + // must be less than `next_min_ts`. + // It's possible to output a very small batch but concatenating small batches + // slows down the reader. + output_batch(&mut output, top.slice(0, pos))?; + // Removes duplicate timestamp and fixes the heap. Keeps the timestamp with largest + // sequence. + // Safety: pos is a valid index returned by `binary_search` and `sequences` are always + // not null. + if top.get_sequence(pos) > next.first_sequence().unwrap() { + // Safety: `next` is not None. + let next = heap.pop().unwrap().0; + // Keeps the timestamp in top and skips the first timestamp in the `next` + // batch. + push_remaining_to_heap(&mut heap, next, 1); + // Skips already outputted timestamps. + push_remaining_to_heap(&mut heap, top, pos); + } else { + // Keeps timestamp in next and skips the duplicated timestamp and already outputted + // timestamp in top. + push_remaining_to_heap(&mut heap, top, pos + 1); + } + } + Err(pos) => { + // No duplicate timestamp. Outputs timestamp before `pos`. + output_batch(&mut output, top.slice(0, pos))?; + push_remaining_to_heap(&mut heap, top, pos); + } + } + } + + Ok(output) + } +} + +/// Skips first `num_to_skip` rows from the batch and pushes remaining batch into the heap if the batch +/// is still not empty. +fn push_remaining_to_heap(heap: &mut BinaryHeap, batch: Batch, num_to_skip: usize) { + debug_assert!(batch.num_rows() >= num_to_skip); + let remaining = batch.num_rows() - num_to_skip; + if remaining == 0 { + // Nothing remains. + return; + } + + heap.push(CompareTimeSeq(batch.slice(num_to_skip, remaining))); +} + +/// Removes deleted items from the `batch` and pushes it back to the `output` if +/// the `batch` is not empty. +fn output_batch(output: &mut VecDeque, mut batch: Batch) -> Result<()> { + // Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter + // rows by sequence for simplicity and performance reason. + batch.filter_deleted()?; + if batch.is_empty() { + return Ok(()); + } + + output.push_back(batch); + Ok(()) +} + +/// Compare [Batch] by timestamp and sequence. +struct CompareTimeSeq(Batch); + +impl PartialEq for CompareTimeSeq { + fn eq(&self, other: &Self) -> bool { + self.0.first_timestamp() == other.0.first_timestamp() + && self.0.first_sequence() == other.0.first_sequence() + } +} + +impl Eq for CompareTimeSeq {} + +impl PartialOrd for CompareTimeSeq { + fn partial_cmp(&self, other: &CompareTimeSeq) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CompareTimeSeq { + /// Compares by first timestamp desc, first sequence. (The heap is a max heap). + fn cmp(&self, other: &CompareTimeSeq) -> Ordering { + self.0 + .first_timestamp() + .cmp(&other.0.first_timestamp()) + .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence())) + // We reverse the ordering as the heap is a max heap. + .reverse() } } @@ -396,17 +525,19 @@ mod tests { &[ new_batch( b"k1", - &[1, 2, 4, 5, 7], - &[11, 12, 14, 15, 17], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - ], - &[21, 22, 24, 25, 27], + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], ), + new_batch( + b"k1", + &[4, 5], + &[14, 15], + &[OpType::Put, OpType::Put], + &[24, 25], + ), + new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]), new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), ], ) @@ -467,27 +598,63 @@ mod tests { &[ new_batch( b"k1", - &[1, 2, 3, 4], - &[11, 12, 10, 14], - &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], - &[21, 22, 33, 24], - ), - new_batch( - b"k2", - &[1, 3, 10], - &[11, 13, 20], - &[OpType::Put, OpType::Put, OpType::Put], - &[21, 23, 30], + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], ), + new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]), + new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]), + new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]), + new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]), ], ) .await; } + #[tokio::test] + async fn test_merge_deleted() { + let reader1 = VecBatchReader::new(&[ + new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Delete, OpType::Delete], + &[21, 22], + ), + new_batch( + b"k2", + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ), + ]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[4, 5], + &[14, 15], + &[OpType::Delete, OpType::Delete], + &[24, 25], + )]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23])], + ) + .await; + } + #[test] fn test_batch_merger_empty() { let mut merger = BatchMerger::new(); - assert!(merger.merge_batches().unwrap().is_none()); + assert!(merger.merge_batches().unwrap().is_empty()); } #[test] @@ -509,7 +676,48 @@ mod tests { &[22, 24], )); assert!(!merger.is_sorted); - let batch = merger.merge_batches().unwrap().unwrap(); + let batches = merger.merge_batches().unwrap(); + let batch = Batch::concat(batches.into_iter().collect()).unwrap(); + assert_eq!( + batch, + new_batch( + b"k1", + &[1, 2, 3, 4, 5], + &[10, 11, 10, 11, 10], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put + ], + &[21, 22, 23, 24, 25] + ) + ); + assert!(merger.is_sorted); + } + + #[test] + fn test_batch_merger_unsorted_by_heap() { + let mut merger = BatchMerger::new(); + merger.push(new_batch( + b"k1", + &[1, 3, 5], + &[10, 10, 10], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 23, 25], + )); + assert!(merger.is_sorted); + merger.push(new_batch( + b"k1", + &[2, 4], + &[11, 11], + &[OpType::Put, OpType::Put], + &[22, 24], + )); + assert!(!merger.is_sorted); + let batches = merger.merge_batches().unwrap(); + let batch = Batch::concat(batches.into_iter().collect()).unwrap(); assert_eq!( batch, new_batch(