From 214ffe71092d370884738009fdb8b7a16eb00d2d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 8 Aug 2025 15:03:22 +0800 Subject: [PATCH] feat: Implements an iterator to merge RecordBatches (#6666) * feat: Implements the sync merge iterator for RecordBatch Signed-off-by: evenyag * test: test merge iterator Signed-off-by: evenyag * fix: fix incorrect column index Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * chore: fix typos Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/datatypes/src/timestamp.rs | 1 + src/mito2/src/read.rs | 1 + src/mito2/src/read/flat_merge.rs | 943 +++++++++++++++++++++++ src/mito2/src/sst/parquet/flat_format.rs | 10 + 4 files changed, 955 insertions(+) create mode 100644 src/mito2/src/read/flat_merge.rs diff --git a/src/datatypes/src/timestamp.rs b/src/datatypes/src/timestamp.rs index 6d699d3d56..0c1685eb1c 100644 --- a/src/datatypes/src/timestamp.rs +++ b/src/datatypes/src/timestamp.rs @@ -143,6 +143,7 @@ define_timestamp_with_unit!(Millisecond); define_timestamp_with_unit!(Microsecond); define_timestamp_with_unit!(Nanosecond); +/// Converts a timestamp array to a primitive array and the time unit. pub fn timestamp_array_to_primitive( ts_array: &ArrayRef, ) -> Option<( diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index de73d9ce85..02dd0f1f7f 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,6 +16,7 @@ pub mod compat; pub mod dedup; +pub mod flat_merge; pub mod last_row; pub mod merge; pub mod plain_batch; diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs new file mode 100644 index 0000000000..10238a7c75 --- /dev/null +++ b/src/mito2/src/read/flat_merge.rs @@ -0,0 +1,943 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::sync::Arc; + +use datatypes::arrow::array::{Int64Array, UInt64Array}; +use datatypes::arrow::compute::interleave; +use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::arrow_array::BinaryArray; +use datatypes::timestamp::timestamp_array_to_primitive; +use snafu::ResultExt; +use store_api::storage::SequenceNumber; + +use crate::error::{ComputeArrowSnafu, Result}; +use crate::memtable::BoxedRecordBatchIterator; +use crate::sst::parquet::flat_format::{ + primary_key_column_index, sequence_column_index, time_index_column_index, +}; +use crate::sst::parquet::format::PrimaryKeyArray; + +/// Keeps track of the current position in a batch +#[derive(Debug, Copy, Clone, Default)] +struct BatchCursor { + /// The index into BatchBuilder::batches + batch_idx: usize, + /// The row index within the given batch + row_idx: usize, +} + +/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`] +// Ports from https://github.com/apache/datafusion/blob/49.0.0/datafusion/physical-plan/src/sorts/builder.rs +// Adds the `take_remaining_rows()` method. +#[derive(Debug)] +pub struct BatchBuilder { + /// The schema of the RecordBatches yielded by this stream + schema: SchemaRef, + + /// Maintain a list of [`RecordBatch`] and their corresponding stream + batches: Vec<(usize, RecordBatch)>, + + /// The current [`BatchCursor`] for each stream + cursors: Vec, + + /// The accumulated stream indexes from which to pull rows + /// Consists of a tuple of `(batch_idx, row_idx)` + indices: Vec<(usize, usize)>, +} + +impl BatchBuilder { + /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size` + pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self { + Self { + schema, + batches: Vec::with_capacity(stream_count * 2), + cursors: vec![BatchCursor::default(); stream_count], + indices: Vec::with_capacity(batch_size), + } + } + + /// Append a new batch in `stream_idx` + pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) { + let batch_idx = self.batches.len(); + self.batches.push((stream_idx, batch)); + self.cursors[stream_idx] = BatchCursor { + batch_idx, + row_idx: 0, + }; + } + + /// Append the next row from `stream_idx` + pub fn push_row(&mut self, stream_idx: usize) { + let cursor = &mut self.cursors[stream_idx]; + let row_idx = cursor.row_idx; + cursor.row_idx += 1; + self.indices.push((cursor.batch_idx, row_idx)); + } + + /// Returns the number of in-progress rows in this [`BatchBuilder`] + pub fn len(&self) -> usize { + self.indices.len() + } + + /// Returns `true` if this [`BatchBuilder`] contains no in-progress rows + pub fn is_empty(&self) -> bool { + self.indices.is_empty() + } + + /// Returns the schema of this [`BatchBuilder`] + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Drains the in_progress row indexes, and builds a new RecordBatch from them + /// + /// Will then drop any batches for which all rows have been yielded to the output + /// + /// Returns `None` if no pending rows + pub fn build_record_batch(&mut self) -> Result> { + if self.is_empty() { + return Ok(None); + } + + let columns = (0..self.schema.fields.len()) + .map(|column_idx| { + let arrays: Vec<_> = self + .batches + .iter() + .map(|(_, batch)| batch.column(column_idx).as_ref()) + .collect(); + interleave(&arrays, &self.indices).context(ComputeArrowSnafu) + }) + .collect::>>()?; + + self.indices.clear(); + + // New cursors are only created once the previous cursor for the stream + // is finished. This means all remaining rows from all but the last batch + // for each stream have been yielded to the newly created record batch + // + // We can therefore drop all but the last batch for each stream + self.retain_batches(); + + RecordBatch::try_new(Arc::clone(&self.schema), columns) + .context(ComputeArrowSnafu) + .map(Some) + } + + /// Slice and take remaining rows from the last batch of `stream_idx` and push + /// the next batch if available. + pub fn take_remaining_rows( + &mut self, + stream_idx: usize, + next: Option, + ) -> RecordBatch { + let cursor = &mut self.cursors[stream_idx]; + let batch = &self.batches[cursor.batch_idx]; + let output = batch + .1 + .slice(cursor.row_idx, batch.1.num_rows() - cursor.row_idx); + cursor.row_idx = batch.1.num_rows(); + + if let Some(b) = next { + self.push_batch(stream_idx, b); + self.retain_batches(); + } + + output + } + + fn retain_batches(&mut self) { + let mut batch_idx = 0; + let mut retained = 0; + self.batches.retain(|(stream_idx, _)| { + let stream_cursor = &mut self.cursors[*stream_idx]; + let retain = stream_cursor.batch_idx == batch_idx; + batch_idx += 1; + + if retain { + stream_cursor.batch_idx = retained; + retained += 1; + } + retain + }); + } +} + +/// A comparable node of the heap. +trait NodeCmp: Eq + Ord { + /// Returns whether the node still has batch to read. + fn is_eof(&self) -> bool; + + /// Returns true if the key range of current batch in `self` is behind (exclusive) current + /// batch in `other`. + /// + /// # Panics + /// Panics if either `self` or `other` is EOF. + fn is_behind(&self, other: &Self) -> bool; +} + +/// Common algorithm of merging sorted batches from multiple nodes. +struct MergeAlgo { + /// Holds nodes whose key range of current batch **is** overlapped with the merge window. + /// Each node yields batches from a `source`. + /// + /// Node in this heap **MUST** not be empty. A `merge window` is the (primary key, timestamp) + /// range of the **root node** in the `hot` heap. + hot: BinaryHeap, + /// Holds nodes whose key range of current batch **isn't** overlapped with the merge window. + /// + /// Nodes in this heap **MUST** not be empty. + cold: BinaryHeap, +} + +impl MergeAlgo { + /// Creates a new merge algorithm from `nodes`. + /// + /// All nodes must be initialized. + fn new(mut nodes: Vec) -> Self { + // Skips EOF nodes. + nodes.retain(|node| !node.is_eof()); + let hot = BinaryHeap::with_capacity(nodes.len()); + let cold = BinaryHeap::from(nodes); + + let mut algo = MergeAlgo { hot, cold }; + // Initializes the algorithm. + algo.refill_hot(); + + algo + } + + /// Moves nodes in `cold` heap, whose key range is overlapped with current merge + /// window to `hot` heap. + fn refill_hot(&mut self) { + while !self.cold.is_empty() { + if let Some(merge_window) = self.hot.peek() { + let warmest = self.cold.peek().unwrap(); + if warmest.is_behind(merge_window) { + // if the warmest node in the `cold` heap is totally after the + // `merge_window`, then no need to add more nodes into the `hot` + // heap for merge sorting. + break; + } + } + + let warmest = self.cold.pop().unwrap(); + self.hot.push(warmest); + } + } + + /// Push the node popped from `hot` back to a proper heap. + fn reheap(&mut self, node: T) { + if node.is_eof() { + // If the node is EOF, don't put it into the heap again. + // The merge window would be updated, need to refill the hot heap. + self.refill_hot(); + } else { + // Find a proper heap for this node. + let node_is_cold = if let Some(hottest) = self.hot.peek() { + // If key range of this node is behind the hottest node's then we can + // push it to the cold heap. Otherwise we should push it to the hot heap. + node.is_behind(hottest) + } else { + // The hot heap is empty, but we don't known whether the current + // batch of this node is still the hottest. + true + }; + + if node_is_cold { + self.cold.push(node); + } else { + self.hot.push(node); + } + // Anyway, the merge window has been changed, we need to refill the hot heap. + self.refill_hot(); + } + } + + /// Pops the hottest node. + fn pop_hot(&mut self) -> Option { + self.hot.pop() + } + + /// Returns true if there are rows in the hot heap. + fn has_rows(&self) -> bool { + !self.hot.is_empty() + } + + /// Returns true if we can fetch a batch directly instead of a row. + fn can_fetch_batch(&self) -> bool { + self.hot.len() == 1 + } +} + +// TODO(yingwen): Further downcast and store arrays in this struct. +/// Columns to compare for a [RecordBatch]. +struct SortColumns { + primary_key: PrimaryKeyArray, + timestamp: Int64Array, + sequence: UInt64Array, +} + +impl SortColumns { + /// Creates a new [SortColumns] from a [RecordBatch] and the position of the time index column. + /// + /// # Panics + /// Panics if the input batch doesn't have correct internal columns. + fn new(batch: &RecordBatch) -> Self { + let num_columns = batch.num_columns(); + let primary_key = batch + .column(primary_key_column_index(num_columns)) + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + let timestamp = batch.column(time_index_column_index(num_columns)); + let (timestamp, _unit) = timestamp_array_to_primitive(timestamp).unwrap(); + let sequence = batch + .column(sequence_column_index(num_columns)) + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + + Self { + primary_key, + timestamp, + sequence, + } + } + + fn primary_key_at(&self, index: usize) -> &[u8] { + let key = self.primary_key.keys().value(index); + let binary_values = self + .primary_key + .values() + .as_any() + .downcast_ref::() + .unwrap(); + binary_values.value(key as usize) + } + + fn timestamp_at(&self, index: usize) -> i64 { + self.timestamp.value(index) + } + + fn sequence_at(&self, index: usize) -> SequenceNumber { + self.sequence.value(index) + } + + fn num_rows(&self) -> usize { + self.timestamp.len() + } +} + +/// Cursor to a row in the [RecordBatch]. +/// +/// It compares batches by rows. During comparison, it ignores op type as sequence is enough to +/// distinguish different rows. +struct RowCursor { + /// Current row offset. + offset: usize, + /// Keys of the batch. + columns: SortColumns, +} + +impl RowCursor { + fn new(columns: SortColumns) -> Self { + debug_assert!(columns.num_rows() > 0); + + Self { offset: 0, columns } + } + + fn is_finished(&self) -> bool { + self.offset >= self.columns.num_rows() + } + + fn advance(&mut self) { + self.offset += 1; + } + + fn first_primary_key(&self) -> &[u8] { + self.columns.primary_key_at(self.offset) + } + + fn first_timestamp(&self) -> i64 { + self.columns.timestamp_at(self.offset) + } + + fn first_sequence(&self) -> SequenceNumber { + self.columns.sequence_at(self.offset) + } + + fn last_primary_key(&self) -> &[u8] { + self.columns.primary_key_at(self.columns.num_rows() - 1) + } + + fn last_timestamp(&self) -> i64 { + self.columns.timestamp_at(self.columns.num_rows() - 1) + } +} + +impl PartialEq for RowCursor { + fn eq(&self, other: &Self) -> bool { + self.first_primary_key() == other.first_primary_key() + && self.first_timestamp() == other.first_timestamp() + && self.first_sequence() == other.first_sequence() + } +} + +impl Eq for RowCursor {} + +impl PartialOrd for RowCursor { + fn partial_cmp(&self, other: &RowCursor) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for RowCursor { + /// Compares by primary key, time index, sequence desc. + fn cmp(&self, other: &RowCursor) -> Ordering { + self.first_primary_key() + .cmp(other.first_primary_key()) + .then_with(|| self.first_timestamp().cmp(&other.first_timestamp())) + .then_with(|| other.first_sequence().cmp(&self.first_sequence())) + } +} + +/// Iterator to merge multiple sorted iterators into a single sorted iterator. +/// +/// All iterators must be sorted by primary key, time index, sequence desc. +pub struct MergeIterator { + /// The merge algorithm to maintain heaps. + algo: MergeAlgo, + /// Current buffered rows to output. + in_progress: BatchBuilder, + /// Non-empty batch to output. + output_batch: Option, + /// Batch size to merge rows. + /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating + /// rows. + batch_size: usize, +} + +impl MergeIterator { + /// Creates a new iterator to merge sorted `iters`. + pub fn new( + schema: SchemaRef, + iters: Vec, + batch_size: usize, + ) -> Result { + let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size); + let mut nodes = Vec::with_capacity(iters.len()); + // Initialize nodes and the buffer. + for (node_index, iter) in iters.into_iter().enumerate() { + let mut node = IterNode { + node_index, + iter, + cursor: None, + }; + if let Some(batch) = node.advance_batch()? { + in_progress.push_batch(node_index, batch); + nodes.push(node); + } + } + + let algo = MergeAlgo::new(nodes); + + Ok(Self { + algo, + in_progress, + output_batch: None, + batch_size, + }) + } + + /// Fetches next sorted batch. + pub fn next_batch(&mut self) -> Result> { + while self.algo.has_rows() && self.output_batch.is_none() { + if self.algo.can_fetch_batch() && !self.in_progress.is_empty() { + // Only one batch in the hot heap, but we have pending rows, output the pending rows first. + self.output_batch = self.in_progress.build_record_batch()?; + debug_assert!(self.output_batch.is_some()); + } else if self.algo.can_fetch_batch() { + self.fetch_batch_from_hottest()?; + } else { + self.fetch_row_from_hottest()?; + } + } + + if let Some(batch) = self.output_batch.take() { + Ok(Some(batch)) + } else { + // No more batches. + Ok(None) + } + } + + /// Fetches a batch from the hottest node. + fn fetch_batch_from_hottest(&mut self) -> Result<()> { + debug_assert!(self.in_progress.is_empty()); + + // Safety: next_batch() ensures the heap is not empty. + let mut hottest = self.algo.pop_hot().unwrap(); + debug_assert!(!hottest.current_cursor().is_finished()); + let next = hottest.advance_batch()?; + // The node is the heap is not empty, so it must have existing rows in the builder. + let batch = self + .in_progress + .take_remaining_rows(hottest.node_index, next); + Self::maybe_output_batch(batch, &mut self.output_batch); + self.algo.reheap(hottest); + + Ok(()) + } + + /// Fetches a row from the hottest node. + fn fetch_row_from_hottest(&mut self) -> Result<()> { + // Safety: next_batch() ensures the heap has more than 1 element. + let mut hottest = self.algo.pop_hot().unwrap(); + debug_assert!(!hottest.current_cursor().is_finished()); + self.in_progress.push_row(hottest.node_index); + if self.in_progress.len() >= self.batch_size { + // We buffered enough rows. + if let Some(output) = self.in_progress.build_record_batch()? { + Self::maybe_output_batch(output, &mut self.output_batch); + } + } + + if let Some(next) = hottest.advance_row()? { + self.in_progress.push_batch(hottest.node_index, next); + } + + self.algo.reheap(hottest); + Ok(()) + } + + /// Adds the batch to the output batch if it is not empty. + fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option) { + debug_assert!(output_batch.is_none()); + if batch.num_rows() > 0 { + *output_batch = Some(batch); + } + } +} + +/// A sync node in the merge iterator. +struct IterNode { + /// Index of the node. + node_index: usize, + /// Iterator of this `Node`. + iter: BoxedRecordBatchIterator, + /// Current batch to be read. The node should ensure the batch is not empty (The + /// cursor is not finished). + /// + /// `None` means the `iter` has reached EOF. + cursor: Option, +} + +impl IterNode { + /// Returns current cursor. + /// + /// # Panics + /// Panics if the node has reached EOF. + fn current_cursor(&self) -> &RowCursor { + self.cursor.as_ref().unwrap() + } + + /// Fetches a new batch from the iter and updates the cursor. + /// It advances the current batch. + /// Returns the fetched new batch. + fn advance_batch(&mut self) -> Result> { + let batch = self.advance_inner_iter()?; + let columns = batch.as_ref().map(SortColumns::new); + self.cursor = columns.map(RowCursor::new); + + Ok(batch) + } + + /// Skips one row. + /// Returns the next batch if the current batch is finished. + fn advance_row(&mut self) -> Result> { + let cursor = self.cursor.as_mut().unwrap(); + cursor.advance(); + if !cursor.is_finished() { + return Ok(None); + } + + // Finished current batch, need to fetch a new batch. + self.advance_batch() + } + + /// Fetches a non-empty batch from the iter. + fn advance_inner_iter(&mut self) -> Result> { + while let Some(batch) = self.iter.next().transpose()? { + if batch.num_rows() > 0 { + return Ok(Some(batch)); + } + } + Ok(None) + } +} + +impl NodeCmp for IterNode { + fn is_eof(&self) -> bool { + self.cursor.is_none() + } + + fn is_behind(&self, other: &Self) -> bool { + debug_assert!(!self.current_cursor().is_finished()); + debug_assert!(!other.current_cursor().is_finished()); + + // We only compare pk and timestamp so nodes in the cold + // heap don't have overlapping timestamps with the hottest node + // in the hot heap. + self.current_cursor() + .first_primary_key() + .cmp(other.current_cursor().last_primary_key()) + .then_with(|| { + self.current_cursor() + .first_timestamp() + .cmp(&other.current_cursor().last_timestamp()) + }) + == Ordering::Greater + } +} + +impl PartialEq for IterNode { + fn eq(&self, other: &IterNode) -> bool { + self.cursor == other.cursor + } +} + +impl Eq for IterNode {} + +impl PartialOrd for IterNode { + fn partial_cmp(&self, other: &IterNode) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for IterNode { + fn cmp(&self, other: &IterNode) -> Ordering { + // The std binary heap is a max heap, but we want the nodes are ordered in + // ascend order, so we compare the nodes in reverse order. + other.cursor.cmp(&self.cursor) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::OpType; + use datatypes::arrow::array::builder::BinaryDictionaryBuilder; + use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; + use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt32Type}; + use datatypes::arrow::record_batch::RecordBatch; + + use super::*; + + /// Creates a test RecordBatch with the specified data. + fn create_test_record_batch( + primary_keys: &[&[u8]], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field_values: &[i64], + ) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), + false, + ), + Field::new("__sequence", DataType::UInt64, false), + Field::new("__op_type", DataType::UInt8, false), + ])); + + let field1 = Arc::new(Int64Array::from_iter_values(field_values.iter().copied())); + let timestamp = Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + )); + + // Create primary key dictionary array using BinaryDictionaryBuilder + let mut builder = BinaryDictionaryBuilder::::new(); + for &key in primary_keys { + builder.append(key).unwrap(); + } + let primary_key = Arc::new(builder.finish()); + + let sequence = Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())); + let op_type = Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|&v| v as u8), + )); + + RecordBatch::try_new( + schema, + vec![field1, timestamp, primary_key, sequence, op_type], + ) + .unwrap() + } + + fn new_test_iter(batches: Vec) -> BoxedRecordBatchIterator { + Box::new(batches.into_iter().map(Ok)) + } + + /// Helper function to check if two record batches are equivalent. + fn assert_record_batches_eq(expected: &[RecordBatch], actual: &[RecordBatch]) { + for (exp, act) in expected.iter().zip(actual.iter()) { + assert_eq!(exp, act,); + } + } + + /// Helper function to collect all batches from a MergeIterator. + fn collect_merge_iterator_batches( + mut iter: MergeIterator, + ) -> crate::error::Result> { + let mut batches = Vec::new(); + while let Some(batch) = iter.next_batch()? { + batches.push(batch); + } + Ok(batches) + } + + #[test] + fn test_merge_iterator_empty() { + let schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), + false, + ), + Field::new("__sequence", DataType::UInt64, false), + Field::new("__op_type", DataType::UInt8, false), + ])); + + let mut merge_iter = MergeIterator::new(schema, vec![], 1024).unwrap(); + assert!(merge_iter.next_batch().unwrap().is_none()); + } + + #[test] + fn test_merge_iterator_single_batch() { + let batch = create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 2000], + &[21, 22], + &[OpType::Put, OpType::Put], + &[11, 12], + ); + + let schema = batch.schema(); + let iter = Box::new(new_test_iter(vec![batch.clone()])); + + let merge_iter = MergeIterator::new(schema, vec![iter], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter).unwrap(); + + assert_eq!(result.len(), 1); + assert_record_batches_eq(&[batch], &result); + } + + #[test] + fn test_merge_iterator_non_overlapping() { + let batch1 = create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 2000], + &[21, 22], + &[OpType::Put, OpType::Put], + &[11, 12], + ); + let batch2 = create_test_record_batch( + &[b"k1", b"k1"], + &[4000, 5000], + &[24, 25], + &[OpType::Put, OpType::Put], + &[14, 15], + ); + let batch3 = create_test_record_batch( + &[b"k2", b"k2"], + &[2000, 3000], + &[22, 23], + &[OpType::Delete, OpType::Put], + &[12, 13], + ); + + let schema = batch1.schema(); + let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()])); + let iter2 = Box::new(new_test_iter(vec![batch2.clone()])); + + let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter).unwrap(); + + // Results should be sorted by primary key, timestamp, sequence desc + let expected = vec![batch1, batch2, batch3]; + assert_record_batches_eq(&expected, &result); + } + + #[test] + fn test_merge_iterator_overlapping_timestamps() { + // Create batches with overlapping timestamps but different sequences + let batch1 = create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 2000], + &[21, 22], + &[OpType::Put, OpType::Put], + &[11, 12], + ); + let batch2 = create_test_record_batch( + &[b"k1", b"k1"], + &[1500, 2500], + &[31, 32], + &[OpType::Put, OpType::Put], + &[15, 25], + ); + + let schema = batch1.schema(); + let iter1 = Box::new(new_test_iter(vec![batch1])); + let iter2 = Box::new(new_test_iter(vec![batch2])); + + let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter).unwrap(); + + let expected = vec![ + create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 1500], + &[21, 31], + &[OpType::Put, OpType::Put], + &[11, 15], + ), + create_test_record_batch(&[b"k1"], &[2000], &[22], &[OpType::Put], &[12]), + create_test_record_batch(&[b"k1"], &[2500], &[32], &[OpType::Put], &[25]), + ]; + assert_record_batches_eq(&expected, &result); + } + + #[test] + fn test_merge_iterator_duplicate_keys_sequences() { + // Test with same primary key and timestamp but different sequences + let batch1 = create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 1000], + &[20, 10], + &[OpType::Put, OpType::Put], + &[1, 2], + ); + let batch2 = create_test_record_batch( + &[b"k1"], + &[1000], + &[15], // Middle sequence + &[OpType::Put], + &[3], + ); + + let schema = batch1.schema(); + let iter1 = Box::new(new_test_iter(vec![batch1])); + let iter2 = Box::new(new_test_iter(vec![batch2])); + + let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter).unwrap(); + + // Should be sorted by sequence descending for same key/timestamp + let expected = vec![ + create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 1000], + &[20, 15], + &[OpType::Put, OpType::Put], + &[1, 3], + ), + create_test_record_batch(&[b"k1"], &[1000], &[10], &[OpType::Put], &[2]), + ]; + assert_record_batches_eq(&expected, &result); + } + + #[test] + fn test_batch_builder_basic() { + let schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Int64, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ])); + + let mut builder = BatchBuilder::new(schema.clone(), 2, 1024); + assert!(builder.is_empty()); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1, 2])), + Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])), + ], + ) + .unwrap(); + + builder.push_batch(0, batch); + builder.push_row(0); + builder.push_row(0); + + assert!(!builder.is_empty()); + assert_eq!(builder.len(), 2); + + let result_batch = builder.build_record_batch().unwrap().unwrap(); + assert_eq!(result_batch.num_rows(), 2); + } + + #[test] + fn test_row_cursor_comparison() { + // Create test batches for cursor comparison + let batch1 = create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 2000], + &[22, 21], + &[OpType::Put, OpType::Put], + &[11, 12], + ); + let batch2 = create_test_record_batch( + &[b"k1", b"k1"], + &[1000, 2000], + &[23, 20], // Different sequences + &[OpType::Put, OpType::Put], + &[11, 12], + ); + + let columns1 = SortColumns::new(&batch1); + let columns2 = SortColumns::new(&batch2); + + let cursor1 = RowCursor::new(columns1); + let cursor2 = RowCursor::new(columns2); + + // cursors with same pk and timestamp should be ordered by sequence desc + // cursor1 has sequence 22, cursor2 has sequence 23, so cursor2 < cursor1 (higher sequence comes first) + assert!(cursor2 < cursor1); + } +} diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 59cbd341e8..6fb587f367 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -104,6 +104,16 @@ pub(crate) fn sequence_column_index(num_columns: usize) -> usize { num_columns - 2 } +/// Returns the position of the time index column. +pub(crate) fn time_index_column_index(num_columns: usize) -> usize { + num_columns - 4 +} + +/// Returns the position of the primary key column. +pub(crate) fn primary_key_column_index(num_columns: usize) -> usize { + num_columns - 3 +} + // TODO(yingwen): Add an option to skip reading internal columns. /// Helper for reading the flat SST format with projection. ///