From 465c8f714ebde1b46aac110303a857f97fbb1e0b Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 31 Oct 2023 14:36:31 +0800 Subject: [PATCH] feat(mito): avoid buffering all batches for the same primary key (#2658) * feat: Control merge reader by batch size * test: test heap have large range * fix: merge one batch * test: merge many duplicates * test: test reheap hot * feat: don't handle empty batch in merge reader --- src/mito2/src/read/merge.rs | 959 +++++++++++++++++++++++----------- src/storage/src/read/merge.rs | 4 +- 2 files changed, 647 insertions(+), 316 deletions(-) diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index b86eb8d698..187449c016 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -15,117 +15,264 @@ //! Merge reader implementation. use std::cmp::Ordering; -use std::collections::{BinaryHeap, VecDeque}; +use std::collections::BinaryHeap; use std::mem; use async_trait::async_trait; +use common_time::Timestamp; use crate::error::Result; use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; +/// Minimum batch size to output. +const MIN_BATCH_SIZE: usize = 64; + /// Reader to merge sorted batches. /// /// The merge reader merges [Batch]es from multiple sources that yield sorted batches. /// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can /// ignore op type as sequence is already unique). /// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index). +/// 3. Batches from sources **must** not be empty. pub struct MergeReader { - /// Holds a min-heap for all [Node]s. Each node yields batches from a `source`. + /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window. + /// Each node yields batches from a `source`. /// - /// `Node` in this heap **must** not be EOF. - nodes: BinaryHeap, - /// Batches for the next primary key. + /// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp) + /// range of the **root node** in the `hot` heap. + hot: BinaryHeap, + /// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window. + /// + /// `Node` in this heap **must** not be empty. + cold: BinaryHeap, + /// Batches to output. batch_merger: BatchMerger, - /// Sorted batches to output. - output: VecDeque, + /// Suggested size of each batch. The batch returned by the reader can have more rows than the + /// batch size. + batch_size: usize, } #[async_trait] impl BatchReader for MergeReader { async fn next_batch(&mut self) -> Result> { - while !self.output.is_empty() || !self.nodes.is_empty() { - // Takes from sorted output if there are batches in it. - if let Some(batch) = self.output.pop_front() { - return Ok(Some(batch)); + while !self.hot.is_empty() && self.batch_merger.num_rows() < self.batch_size { + if let Some(current_key) = self.batch_merger.primary_key() { + // If the hottest node has a different key, we have finish collecting current key. + // Safety: hot is not empty. + if self.hot.peek().unwrap().primary_key() != current_key { + break; + } } - // Collects batches to the merger. - self.collect_batches_to_merge().await?; - - // Merge collected batches to output. - self.output = self.batch_merger.merge_batches()?; + if self.hot.len() == 1 { + // No need to do merge sort if only one batch in the hot heap. + self.fetch_batch_from_hottest().await?; + } else { + // We could only fetch rows that less than the next node from the hottest node. + self.fetch_rows_from_hottest().await?; + } } - Ok(None) + if self.batch_merger.is_empty() { + // Nothing fetched. + Ok(None) + } else { + self.batch_merger.merge_batches() + } } } impl MergeReader { - /// Creates a new [MergeReader]. - pub async fn new(sources: Vec) -> Result { - let mut nodes = BinaryHeap::with_capacity(sources.len()); + /// Creates and initializes a new [MergeReader]. + pub async fn new(sources: Vec, batch_size: usize) -> Result { + let mut cold = BinaryHeap::with_capacity(sources.len()); + let hot = BinaryHeap::with_capacity(sources.len()); for source in sources { let node = Node::new(source).await?; if !node.is_eof() { - // Ensure `nodes` don't have eof node. - nodes.push(node); + // Ensure `cold` don't have eof nodes. + cold.push(node); } } - Ok(MergeReader { - nodes, + let mut reader = MergeReader { + hot, + cold, batch_merger: BatchMerger::new(), - output: VecDeque::new(), - }) + batch_size, + }; + // Initializes the reader. + reader.refill_hot(); + + Ok(reader) } - /// Collect batches from sources for the same primary key. - async fn collect_batches_to_merge(&mut self) -> Result<()> { - while !self.nodes.is_empty() { - // Peek current key. - let Some(current_key) = self.batch_merger.primary_key() else { - // The merger is empty, we could push it directly. - self.take_batch_from_heap().await?; - // Try next node. - continue; - }; - // If next node has a different key, we have finish collecting current key. - // Safety: node is not empty. - if self.nodes.peek().unwrap().primary_key() != current_key { - break; + /// Moves nodes in `cold` heap, whose key range is overlapped with current merge + /// window to `hot` heap. + fn refill_hot(&mut self) { + while !self.cold.is_empty() { + if let Some(merge_window) = self.hot.peek() { + let warmest = self.cold.peek().unwrap(); + if warmest.is_behind(merge_window) { + // if the warmest node in the `cold` heap is totally after the + // `merge_window`, then no need to add more nodes into the `hot` + // heap for merge sorting. + break; + } + } + + let warmest = self.cold.pop().unwrap(); + self.hot.push(warmest); + } + } + + /// Fetches one batch from the hottest node. + async fn fetch_batch_from_hottest(&mut self) -> Result<()> { + assert_eq!(1, self.hot.len()); + + let mut hottest = self.hot.pop().unwrap(); + let batch = hottest.fetch_batch().await?; + self.batch_merger.push(batch)?; + self.reheap(hottest) + } + + /// Fetches non-duplicated rows from the hottest node and skips the timestamp duplicated + /// with the first timestamp in the next node. + async fn fetch_rows_from_hottest(&mut self) -> Result<()> { + // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element. + // Pop hottest node. + let mut top_node = self.hot.pop().unwrap(); + let top = top_node.current_batch(); + // Min timestamp and its sequence in the next batch. + let next_min_ts = { + let next_node = self.hot.peek().unwrap(); + let next = next_node.current_batch(); + // top and next have overlapping rows so they must have same primary keys. + debug_assert_eq!(top.primary_key(), next.primary_key()); + // Safety: Batches in the heap is not empty, so we can use unwrap here. + next.first_timestamp().unwrap() + }; + + // Safety: Batches in the heap is not empty, so we can use unwrap here. + let timestamps = top.timestamps_native().unwrap(); + // Binary searches the timestamp in the top batch. + // Safety: Batches should have the same timestamp resolution so we can compare the native + // value directly. + match timestamps.binary_search(&next_min_ts.value()) { + Ok(pos) => { + // They have duplicate timestamps. Outputs timestamps before the duplciated timestamp. + // Batch itself doesn't contain duplicate timestamps so timestamps before `pos` + // must be less than `next_min_ts`. + self.batch_merger.push(top.slice(0, pos))?; + // This keep the duplicate timestamp in the node. + top_node.skip_rows(pos).await?; + // The merge window should contain this timestamp so only nodes in the hot heap + // have this timestamp. + self.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts) + .await?; + } + Err(pos) => { + // No duplicate timestamp. Outputs timestamp before `pos`. + self.batch_merger.push(top.slice(0, pos))?; + top_node.skip_rows(pos).await?; + self.reheap(top_node)?; } - // They have the same primary key, we could take it and try next node. - self.take_batch_from_heap().await?; } Ok(()) } - /// Takes batch from heap top and reheap. - async fn take_batch_from_heap(&mut self) -> Result<()> { - let mut next_node = self.nodes.pop().unwrap(); - let batch = next_node.fetch_batch().await?; - self.batch_merger.push(batch); + /// Filters the first duplicate `timestamp` in `top_node` and `hot` heap. Only keeps the timestamp + /// with the maximum sequence. + async fn filter_first_duplicate_timestamp_in_hot( + &mut self, + top_node: Node, + timestamp: Timestamp, + ) -> Result<()> { + debug_assert_eq!( + top_node.current_batch().first_timestamp().unwrap(), + timestamp + ); - // Insert the node back to the heap. - // If the node reaches EOF, ignores it. This ensures nodes in the heap is always not EOF. - if next_node.is_eof() { - return Ok(()); + // The node with maximum sequence. + let mut max_seq_node = top_node; + let mut max_seq = max_seq_node.current_batch().first_sequence().unwrap(); + while let Some(mut next_node) = self.hot.pop() { + // Safety: Batches in the heap is not empty. + let next_first_ts = next_node.current_batch().first_timestamp().unwrap(); + let next_first_seq = next_node.current_batch().first_sequence().unwrap(); + + if next_first_ts != timestamp { + // We are done, push the node with max seq. + self.cold.push(next_node); + break; + } + + if max_seq < next_first_seq { + // The next node has larger seq. + max_seq_node.skip_rows(1).await?; + if !max_seq_node.is_eof() { + self.cold.push(max_seq_node); + } + max_seq_node = next_node; + max_seq = next_first_seq; + } else { + next_node.skip_rows(1).await?; + if !next_node.is_eof() { + // If the next node is + self.cold.push(next_node); + } + } + } + debug_assert!(!max_seq_node.is_eof()); + self.cold.push(max_seq_node); + + // The merge window is updated, we need to refill the hot heap. + self.refill_hot(); + + Ok(()) + } + + /// Push the node popped from `hot` back to a proper heap. + fn reheap(&mut self, node: Node) -> Result<()> { + if node.is_eof() { + // If the node is EOF, don't put it into the heap again. + // The merge window would be updated, need to refill the hot heap. + self.refill_hot(); + } else { + // Find a proper heap for this node. + let node_is_cold = if let Some(hottest) = self.hot.peek() { + // If key range of this node is behind the hottest node's then we can + // push it to the cold heap. Otherwise we should push it to the hot heap. + node.is_behind(hottest) + } else { + // The hot heap is empty, but we don't known whether the current + // batch of this node is still the hottest. + true + }; + + if node_is_cold { + self.cold.push(node); + } else { + self.hot.push(node); + } + // Anyway, the merge window has been changed, we need to refill the hot heap. + self.refill_hot(); } - self.nodes.push(next_node); Ok(()) } } /// Builder to build and initialize a [MergeReader]. -#[derive(Default)] pub struct MergeReaderBuilder { /// Input sources. /// /// All source must yield batches with the same schema. sources: Vec, + /// Batch size of the reader. + batch_size: usize, } impl MergeReaderBuilder { @@ -140,25 +287,40 @@ impl MergeReaderBuilder { self } - /// Push a batch iterator to sources. + /// Pushes a batch iterator to sources. pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self { self.sources.push(Source::Iter(iter)); self } + /// Sets the batch size of the reader. + pub fn batch_size(&mut self, size: usize) -> &mut Self { + self.batch_size = if size == 0 { MIN_BATCH_SIZE } else { size }; + self + } + /// Builds and initializes the reader, then resets the builder. pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); - MergeReader::new(sources).await + MergeReader::new(sources, self.batch_size).await } } -/// Helper to merge batches for same primary key. +impl Default for MergeReaderBuilder { + fn default() -> Self { + MergeReaderBuilder { + sources: Vec::new(), + batch_size: MIN_BATCH_SIZE, + } + } +} + +/// Helper to collect and merge small batches for same primary key. struct BatchMerger { /// Buffered non-empty batches to merge. batches: Vec, - /// Whether the batch buffer is still sorted. - is_sorted: bool, + /// Number of rows in the batch. + num_rows: usize, } impl BatchMerger { @@ -166,196 +328,63 @@ impl BatchMerger { fn new() -> BatchMerger { BatchMerger { batches: Vec::new(), - is_sorted: true, // An empty merger is always sorted. + num_rows: 0, } } + /// Returns the number of rows. + fn num_rows(&self) -> usize { + self.num_rows + } + + /// Returns true if the merger is empty. + fn is_empty(&self) -> bool { + self.num_rows() == 0 + } + /// Returns the primary key of current merger and `None` if the merger is empty. fn primary_key(&self) -> Option<&[u8]> { self.batches.first().map(|batch| batch.primary_key()) } - /// Push a `batch` into the merger. + /// Removeds deleted entries and pushes a `batch` into the merger. /// - /// Ignore the `batch` if it is empty. + /// Ignores the `batch` if it is empty. /// /// # Panics /// Panics if the `batch` has another primary key. - fn push(&mut self, batch: Batch) { + fn push(&mut self, mut batch: Batch) -> Result<()> { + debug_assert!(self + .batches + .last() + .map(|b| b.primary_key() == batch.primary_key()) + .unwrap_or(true)); + + batch.filter_deleted()?; if batch.is_empty() { - return; + return Ok(()); } - if self.batches.is_empty() || !self.is_sorted { - // Merger is empty or is not sorted, we can push the batch directly. - self.batches.push(batch); - return; - } - - // Merger is sorted, checks whether we can still preserve sorted state. - let last_batch = self.batches.last().unwrap(); - assert_eq!(last_batch.primary_key(), batch.primary_key()); - match last_batch.last_timestamp().cmp(&batch.first_timestamp()) { - Ordering::Less => { - // Still sorted. - self.batches.push(batch); - return; - } - Ordering::Equal => { - // Check sequence. - if last_batch.last_sequence() > batch.first_sequence() { - // Still sorted. - self.batches.push(batch); - return; - } - } - Ordering::Greater => (), - } - - // Merger is no longer sorted. + self.num_rows += batch.num_rows(); self.batches.push(batch); - self.is_sorted = false; + + Ok(()) } /// Merge all buffered batches and returns the merged batch. Then /// reset the buffer. - fn merge_batches(&mut self) -> Result> { + fn merge_batches(&mut self) -> Result> { if self.batches.is_empty() { - return Ok(VecDeque::new()); + return Ok(None); } - let mut output = VecDeque::with_capacity(self.batches.len()); - if self.is_sorted { - // Fast path. We can output batches directly. - for batch in self.batches.drain(..) { - output_batch(&mut output, batch)?; - } - - return Ok(output); + // Reset number of rows. + self.num_rows = 0; + if self.batches.len() == 1 { + return Ok(self.batches.pop()); } - - // Slow path. We need to merge overlapping batches. - // Constructs a heap from batches. Batches in the heap is not empty, we need to check - // this before pushing a batch into the heap. - let mut heap = BinaryHeap::from_iter(self.batches.drain(..).map(CompareTimeSeq)); - // Reset merger as sorted as we have cleared batches. - self.is_sorted = true; - - // Sorts batches. - while let Some(top) = heap.pop() { - let top = top.0; - let Some(next) = heap.peek() else { - // If there is no remaining batch, we can output the top-most batch. - output_batch(&mut output, top)?; - break; - }; - let next = &next.0; - - if top.last_timestamp() < next.first_timestamp() { - // If the top-most batch doesn't overlaps with the next batch, we can output it. - output_batch(&mut output, top)?; - continue; - } - - // Safety: Batches (top, next) in the heap is not empty, so we can use unwrap here. - // Min timestamp in the next batch. - let next_min_ts = next.first_timestamp().unwrap(); - let timestamps = top.timestamps_native().unwrap(); - // Binary searches the timestamp in the top batch. - // Safety: Batches should have the same timestamp resolution so we can compare the native - // value directly. - match timestamps.binary_search(&next_min_ts.value()) { - Ok(pos) => { - // They have duplicate timestamps. Outputs non overlapping timestamps. - // Batch itself doesn't contain duplicate timestamps so timestamps before `pos` - // must be less than `next_min_ts`. - // It's possible to output a very small batch but concatenating small batches - // slows down the reader. - output_batch(&mut output, top.slice(0, pos))?; - // Removes duplicate timestamp and fixes the heap. Keeps the timestamp with largest - // sequence. - // Safety: pos is a valid index returned by `binary_search` and `sequences` are always - // not null. - if top.get_sequence(pos) > next.first_sequence().unwrap() { - // Safety: `next` is not None. - let next = heap.pop().unwrap().0; - // Keeps the timestamp in top and skips the first timestamp in the `next` - // batch. - push_remaining_to_heap(&mut heap, next, 1); - // Skips already outputted timestamps. - push_remaining_to_heap(&mut heap, top, pos); - } else { - // Keeps timestamp in next and skips the duplicated timestamp and already outputted - // timestamp in top. - push_remaining_to_heap(&mut heap, top, pos + 1); - } - } - Err(pos) => { - // No duplicate timestamp. Outputs timestamp before `pos`. - output_batch(&mut output, top.slice(0, pos))?; - push_remaining_to_heap(&mut heap, top, pos); - } - } - } - - Ok(output) - } -} - -/// Skips first `num_to_skip` rows from the batch and pushes remaining batch into the heap if the batch -/// is still not empty. -fn push_remaining_to_heap(heap: &mut BinaryHeap, batch: Batch, num_to_skip: usize) { - debug_assert!(batch.num_rows() >= num_to_skip); - let remaining = batch.num_rows() - num_to_skip; - if remaining == 0 { - // Nothing remains. - return; - } - - heap.push(CompareTimeSeq(batch.slice(num_to_skip, remaining))); -} - -/// Removes deleted items from the `batch` and pushes it back to the `output` if -/// the `batch` is not empty. -fn output_batch(output: &mut VecDeque, mut batch: Batch) -> Result<()> { - // Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter - // rows by sequence for simplicity and performance reason. - batch.filter_deleted()?; - if batch.is_empty() { - return Ok(()); - } - - output.push_back(batch); - Ok(()) -} - -/// Compare [Batch] by timestamp and sequence. -struct CompareTimeSeq(Batch); - -impl PartialEq for CompareTimeSeq { - fn eq(&self, other: &Self) -> bool { - self.0.first_timestamp() == other.0.first_timestamp() - && self.0.first_sequence() == other.0.first_sequence() - } -} - -impl Eq for CompareTimeSeq {} - -impl PartialOrd for CompareTimeSeq { - fn partial_cmp(&self, other: &CompareTimeSeq) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for CompareTimeSeq { - /// Compares by first timestamp desc, first sequence. (The heap is a max heap). - fn cmp(&self, other: &CompareTimeSeq) -> Ordering { - self.0 - .first_timestamp() - .cmp(&other.0.first_timestamp()) - .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence())) - // We reverse the ordering as the heap is a max heap. - .reverse() + let batches = mem::take(&mut self.batches); + Batch::concat(batches).map(Some) } } @@ -363,7 +392,7 @@ impl Ord for CompareTimeSeq { struct Node { /// Data source of this `Node`. source: Source, - /// Current batch to be read. + /// Current batch to be read. The node ensures the batch is not empty. /// /// `None` means the `source` has reached EOF. current_batch: Option, @@ -374,6 +403,7 @@ impl Node { /// /// It tries to fetch one batch from the `source`. async fn new(mut source: Source) -> Result { + // Ensures batch is not empty. let current_batch = source.next_batch().await?.map(CompareFirst); Ok(Node { source, @@ -409,9 +439,49 @@ impl Node { /// Panics if the node has reached EOF. async fn fetch_batch(&mut self) -> Result { let current = self.current_batch.take().unwrap(); + // Ensures batch is not empty. self.current_batch = self.source.next_batch().await?.map(CompareFirst); Ok(current.0) } + + /// Returns true if the key range of current batch in `self` is behind (exclusive) current + /// batch in `other`. + /// + /// # Panics + /// Panics if either `self` or `other` is EOF. + fn is_behind(&self, other: &Node) -> bool { + debug_assert!(!self.current_batch().is_empty()); + debug_assert!(!other.current_batch().is_empty()); + + // We only compare pk and timestamp so nodes in the cold + // heap don't have overlapping timestamps with the hottest node + // in the hot heap. + self.primary_key().cmp(other.primary_key()).then_with(|| { + self.current_batch() + .first_timestamp() + .cmp(&other.current_batch().last_timestamp()) + }) == Ordering::Greater + } + + /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches + /// next batch from the node. + /// + /// # Panics + /// Panics if the node is EOF. + async fn skip_rows(&mut self, num_to_skip: usize) -> Result<()> { + let batch = self.current_batch(); + debug_assert!(batch.num_rows() >= num_to_skip); + let remaining = batch.num_rows() - num_to_skip; + if remaining == 0 { + // Nothing remains, we need to fetch next batch to ensure the batch is not empty. + self.fetch_batch().await?; + } else { + debug_assert!(!batch.is_empty()); + self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining))); + } + + Ok(()) + } } impl PartialEq for Node { @@ -525,20 +595,59 @@ mod tests { &[ new_batch( b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], + &[1, 2, 4, 5, 7], + &[11, 12, 14, 15, 17], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[21, 22, 24, 25, 27], ), + new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + ], + ) + .await; + } + + #[tokio::test] + async fn test_merge_reheap_hot() { + let reader1 = VecBatchReader::new(&[ + new_batch( + b"k1", + &[1, 3], + &[10, 10], + &[OpType::Put, OpType::Put], + &[21, 23], + ), + new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]), + ]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[2, 4], + &[11, 11], + &[OpType::Put, OpType::Put], + &[32, 34], + )]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[ new_batch( b"k1", - &[4, 5], - &[14, 15], - &[OpType::Put, OpType::Put], - &[24, 25], + &[1, 2, 3, 4], + &[10, 11, 10, 11], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 32, 23, 34], ), - new_batch(b"k1", &[7], &[17], &[OpType::Put], &[27]), - new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]), ], ) .await; @@ -598,16 +707,18 @@ mod tests { &[ new_batch( b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], + &[1, 2, 3, 4], + &[11, 12, 10, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 33, 24], + ), + new_batch( + b"k2", + &[1, 3, 10], + &[11, 13, 20], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 23, 30], ), - new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]), - new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]), - new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]), - new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), - new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]), ], ) .await; @@ -651,89 +762,307 @@ mod tests { .await; } + #[tokio::test] + async fn test_merge_next_node_empty() { + let reader1 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], + )]); + // This reader will be empty after skipping the timestamp. + let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], + )], + ) + .await; + } + + #[tokio::test] + async fn test_merge_top_node_empty() { + // This reader will be empty after skipping the timestamp 2. + let reader1 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 2], + &[10, 10], + &[OpType::Put, OpType::Put], + &[21, 22], + )]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[2, 3], + &[11, 11], + &[OpType::Put, OpType::Put], + &[32, 33], + )]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[new_batch( + b"k1", + &[1, 2, 3], + &[10, 11, 11], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 32, 33], + )], + ) + .await; + } + + #[tokio::test] + async fn test_merge_large_range() { + let reader1 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 10], + &[10, 10], + &[OpType::Put, OpType::Put], + &[21, 30], + )]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 20], + &[11, 11], + &[OpType::Put, OpType::Put], + &[31, 40], + )]); + // The hot heap have a node that doesn't have duplicate + // timestamps. + let reader3 = VecBatchReader::new(&[new_batch( + b"k1", + &[6, 8], + &[11, 11], + &[OpType::Put, OpType::Put], + &[36, 38], + )]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .push_batch_reader(Box::new(reader3)) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[new_batch( + b"k1", + &[1, 6, 8, 10, 20], + &[11, 11, 11, 10, 11], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[31, 36, 38, 30, 40], + )], + ) + .await; + } + + #[tokio::test] + async fn test_merge_many_duplicates() { + let mut builder = MergeReaderBuilder::new(); + builder.batch_size(3); + for i in 0..10 { + let batches: Vec<_> = (0..8) + .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100])) + .collect(); + let reader = VecBatchReader::new(&batches); + builder.push_batch_reader(Box::new(reader)); + } + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch( + b"k1", + &[0, 1, 2], + &[9, 9, 9], + &[OpType::Put, OpType::Put, OpType::Put], + &[100, 100, 100], + ), + new_batch( + b"k1", + &[3, 4, 5], + &[9, 9, 9], + &[OpType::Put, OpType::Put, OpType::Put], + &[100, 100, 100], + ), + new_batch( + b"k1", + &[6, 7], + &[9, 9], + &[OpType::Put, OpType::Put], + &[100, 100], + ), + ], + ) + .await; + } + + #[tokio::test] + async fn test_merge_more_than_batch_size() { + let batches: Vec<_> = (0..MIN_BATCH_SIZE as i64 * 2) + .map(|ts| new_batch(b"k1", &[ts], &[10], &[OpType::Put], &[100])) + .collect(); + let reader = VecBatchReader::new(&batches); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader)) + // Still use the default batch size. + .batch_size(0) + .build() + .await + .unwrap(); + let ts1: Vec<_> = (0..MIN_BATCH_SIZE as i64).collect(); + let ts2: Vec<_> = (MIN_BATCH_SIZE as i64..MIN_BATCH_SIZE as i64 * 2).collect(); + let seqs = vec![10; MIN_BATCH_SIZE]; + let op_types = vec![OpType::Put; MIN_BATCH_SIZE]; + let fields = vec![100; MIN_BATCH_SIZE]; + check_reader_result( + &mut reader, + &[ + new_batch(b"k1", &ts1, &seqs, &op_types, &fields), + new_batch(b"k1", &ts2, &seqs, &op_types, &fields), + ], + ) + .await; + } + + #[tokio::test] + async fn test_merge_more_than_batch_size_overlapping() { + let reader1 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 2, 3, 4, 5, 6, 7, 8, 9], + &[11, 10, 11, 10, 11, 10, 11, 10, 11], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[21, 22, 23, 24, 25, 26, 27, 28, 29], + )]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[1, 2, 3, 4, 5, 6, 7, 8, 9], + &[10, 11, 10, 11, 10, 11, 10, 11, 10], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[31, 32, 33, 34, 35, 36, 37, 38, 39], + )]); + let mut reader = MergeReaderBuilder::new() + .push_batch_iter(Box::new(reader1)) + .push_batch_reader(Box::new(reader2)) + .batch_size(3) + .build() + .await + .unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch( + b"k1", + &[1, 2, 3], + &[11, 11, 11], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 32, 23], + ), + new_batch( + b"k1", + &[4, 5, 6], + &[11, 11, 11], + &[OpType::Put, OpType::Put, OpType::Put], + &[34, 25, 36], + ), + new_batch( + b"k1", + &[7, 8, 9], + &[11, 11, 11], + &[OpType::Put, OpType::Put, OpType::Put], + &[27, 38, 29], + ), + ], + ) + .await; + } + #[test] fn test_batch_merger_empty() { let mut merger = BatchMerger::new(); - assert!(merger.merge_batches().unwrap().is_empty()); + assert!(merger.is_empty()); + assert!(merger.merge_batches().unwrap().is_none()); + assert!(merger.primary_key().is_none()); } #[test] - fn test_batch_merger_unsorted() { + fn test_merge_one_batch() { let mut merger = BatchMerger::new(); - merger.push(new_batch( - b"k1", - &[1, 3, 5], - &[10, 10, 10], - &[OpType::Put, OpType::Put, OpType::Put], - &[21, 23, 25], - )); - assert!(merger.is_sorted); - merger.push(new_batch( - b"k1", - &[2, 4], - &[11, 11], - &[OpType::Put, OpType::Put], - &[22, 24], - )); - assert!(!merger.is_sorted); - let batches = merger.merge_batches().unwrap(); - let batch = Batch::concat(batches.into_iter().collect()).unwrap(); + let expect = new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]); + merger.push(expect.clone()).unwrap(); + let batch = merger.merge_batches().unwrap().unwrap(); + assert_eq!(1, batch.num_rows()); + assert_eq!(expect, batch,); + assert!(merger.is_empty()); + } + + #[test] + fn test_merge_batches() { + let mut merger = BatchMerger::new(); + merger + .push(new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21])) + .unwrap(); + assert_eq!(1, merger.num_rows()); + assert!(!merger.is_empty()); + merger + .push(new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22])) + .unwrap(); + assert_eq!(2, merger.num_rows()); + let batch = merger.merge_batches().unwrap().unwrap(); + assert_eq!(2, batch.num_rows()); assert_eq!( batch, new_batch( b"k1", - &[1, 2, 3, 4, 5], - &[10, 11, 10, 11, 10], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put - ], - &[21, 22, 23, 24, 25] + &[1, 2], + &[10, 10], + &[OpType::Put, OpType::Put,], + &[21, 22] ) ); - assert!(merger.is_sorted); - } - - #[test] - fn test_batch_merger_unsorted_by_heap() { - let mut merger = BatchMerger::new(); - merger.push(new_batch( - b"k1", - &[1, 3, 5], - &[10, 10, 10], - &[OpType::Put, OpType::Put, OpType::Put], - &[21, 23, 25], - )); - assert!(merger.is_sorted); - merger.push(new_batch( - b"k1", - &[2, 4], - &[11, 11], - &[OpType::Put, OpType::Put], - &[22, 24], - )); - assert!(!merger.is_sorted); - let batches = merger.merge_batches().unwrap(); - let batch = Batch::concat(batches.into_iter().collect()).unwrap(); - assert_eq!( - batch, - new_batch( - b"k1", - &[1, 2, 3, 4, 5], - &[10, 11, 10, 11, 10], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put - ], - &[21, 22, 23, 24, 25] - ) - ); - assert!(merger.is_sorted); + assert!(merger.is_empty()); } } diff --git a/src/storage/src/read/merge.rs b/src/storage/src/read/merge.rs index 713cb038d8..d27d05b47b 100644 --- a/src/storage/src/read/merge.rs +++ b/src/storage/src/read/merge.rs @@ -582,7 +582,9 @@ impl MergeReader { // Now key range of this node is behind the hottest node's. node.is_behind(hottest) } else { - false + // Setting this to false should not affect correctness but performance because + // `refille_hot()` ensures the hottest node is correct. + true }; if node_is_cold {