diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index db6f50418e..08c89abf3e 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -27,6 +27,7 @@ use mito2::memtable::bulk::part_reader::BulkPartRecordBatchIter; use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable}; use mito2::memtable::time_series::TimeSeriesMemtable; use mito2::memtable::{KeyValues, Memtable}; +use mito2::read::flat_merge::FlatMergeIterator; use mito2::region::options::MergeMode; use mito2::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; @@ -423,6 +424,70 @@ fn bulk_part_converter(c: &mut Criterion) { } } +fn flat_merge_iterator_bench(c: &mut Criterion) { + let metadata = Arc::new(cpu_metadata()); + let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + let start_sec = 1710043200; + + let mut group = c.benchmark_group("flat_merge_iterator"); + group.sample_size(10); + + for &num_parts in &[8, 16, 32, 64, 128, 256, 512] { + // Pre-create BulkParts with different timestamps but same hosts (1024) + let mut bulk_parts = Vec::with_capacity(num_parts); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + + for part_idx in 0..num_parts { + let generator = CpuDataGenerator::new( + metadata.clone(), + 1024, // 1024 hosts per part + start_sec + part_idx as i64 * 10, // Different timestamps for each part + start_sec + part_idx as i64 * 10 + 1, + ); + + let mut converter = + BulkPartConverter::new(&metadata, schema.clone(), 1024, codec.clone(), true); + if let Some(kvs) = generator.iter().next() { + converter.append_key_values(&kvs).unwrap(); + } + let bulk_part = converter.convert().unwrap(); + bulk_parts.push(bulk_part); + } + + // Pre-create BulkIterContext + let context = Arc::new(BulkIterContext::new( + metadata.clone(), + &None, // No projection + None, // No predicate + )); + + group.bench_with_input( + format!("{}_parts_1024_hosts", num_parts), + &num_parts, + |b, _| { + b.iter(|| { + // Create iterators from BulkParts + let mut iters = Vec::with_capacity(num_parts); + for bulk_part in &bulk_parts { + let iter = BulkPartRecordBatchIter::new( + bulk_part.batch.clone(), + context.clone(), + None, // No sequence filter + ); + iters.push(Box::new(iter) as _); + } + + // Create and consume FlatMergeIterator + let merge_iter = FlatMergeIterator::new(schema.clone(), iters, 1024).unwrap(); + for batch_result in merge_iter { + let _batch = batch_result.unwrap(); + } + }); + }, + ); + } +} + fn bulk_part_record_batch_iter_filter(c: &mut Criterion) { let metadata = Arc::new(cpu_metadata()); let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); @@ -498,6 +563,7 @@ criterion_group!( full_scan, filter_1_host, bulk_part_converter, - bulk_part_record_batch_iter_filter + bulk_part_record_batch_iter_filter, + flat_merge_iterator_bench ); criterion_main!(benches); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 76e43a41f3..bb79ea7503 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -43,6 +43,7 @@ use datafusion_common::arrow::array::UInt8Array; use datatypes::arrow; use datatypes::arrow::array::{Array, ArrayRef, UInt64Array}; use datatypes::arrow::compute::SortOptions; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow::row::{RowConverter, SortField}; use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; use datatypes::scalars::ScalarVectorBuilder; @@ -1014,6 +1015,9 @@ pub type BoxedBatchReader = Box; /// Pointer to a stream that yields [Batch]. pub type BoxedBatchStream = BoxStream<'static, Result>; +/// Pointer to a stream that yields [RecordBatch]. +pub type BoxedRecordBatchStream = BoxStream<'static, Result>; + #[async_trait::async_trait] impl BatchReader for Box { async fn next_batch(&mut self) -> Result> { diff --git a/src/mito2/src/read/flat_dedup.rs b/src/mito2/src/read/flat_dedup.rs index aa9b232824..5b5120e4a4 100644 --- a/src/mito2/src/read/flat_dedup.rs +++ b/src/mito2/src/read/flat_dedup.rs @@ -17,6 +17,7 @@ use std::ops::Range; use api::v1::OpType; +use async_stream::try_stream; use datatypes::arrow::array::{ make_comparator, Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt64Array, UInt8Array, @@ -30,6 +31,7 @@ use datatypes::arrow::compute::{ }; use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; +use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result}; @@ -41,13 +43,13 @@ use crate::sst::parquet::flat_format::{ use crate::sst::parquet::format::{PrimaryKeyArray, FIXED_POS_COLUMN_NUM}; /// An iterator to dedup sorted batches from an iterator based on the dedup strategy. -pub struct DedupIterator { +pub struct FlatDedupIterator { iter: I, strategy: S, metrics: DedupMetrics, } -impl DedupIterator { +impl FlatDedupIterator { /// Creates a new dedup iterator. pub fn new(iter: I, strategy: S) -> Self { Self { @@ -58,7 +60,7 @@ impl DedupIterator { } } -impl>, S: RecordBatchDedupStrategy> DedupIterator { +impl>, S: RecordBatchDedupStrategy> FlatDedupIterator { /// Returns the next deduplicated batch. fn fetch_next_batch(&mut self) -> Result> { while let Some(batch) = self.iter.next().transpose()? { @@ -72,7 +74,7 @@ impl>, S: RecordBatchDedupStrategy> Dedup } impl>, S: RecordBatchDedupStrategy> Iterator - for DedupIterator + for FlatDedupIterator { type Item = Result; @@ -81,6 +83,46 @@ impl>, S: RecordBatchDedupStrategy> Itera } } +/// An async reader to dedup sorted record batches from a stream based on the dedup strategy. +pub struct DedupReader { + stream: I, + strategy: S, + metrics: DedupMetrics, +} + +impl DedupReader { + /// Creates a new dedup iterator. + pub fn new(stream: I, strategy: S) -> Self { + Self { + stream, + strategy, + metrics: DedupMetrics::default(), + } + } +} + +impl> + Unpin, S: RecordBatchDedupStrategy> DedupReader { + /// Returns the next deduplicated batch. + async fn fetch_next_batch(&mut self) -> Result> { + while let Some(batch) = self.stream.try_next().await? { + if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? { + return Ok(Some(batch)); + } + } + + self.strategy.finish(&mut self.metrics) + } + + /// Converts the reader into a stream. + pub fn into_stream(mut self) -> impl Stream> { + try_stream! { + while let Some(batch) = self.fetch_next_batch().await? { + yield batch; + } + } + } +} + /// Strategy to remove duplicate rows from sorted record batches. pub trait RecordBatchDedupStrategy: Send { /// Pushes a batch to the dedup strategy. @@ -826,7 +868,7 @@ mod tests { // Test with filter_deleted = true let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(true)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&input, &result); assert_eq!(0, dedup_iter.metrics.num_unselected_rows); @@ -834,7 +876,7 @@ mod tests { // Test with filter_deleted = false let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(false)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&input, &result); assert_eq!(0, dedup_iter.metrics.num_unselected_rows); @@ -890,7 +932,7 @@ mod tests { ]; let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(true)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&expected_filter_deleted, &result); assert_eq!(5, dedup_iter.metrics.num_unselected_rows); @@ -923,7 +965,7 @@ mod tests { ]; let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(false)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&expected_no_filter, &result); assert_eq!(3, dedup_iter.metrics.num_unselected_rows); @@ -952,7 +994,7 @@ mod tests { // Test with filter_deleted = true let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&input, &result); assert_eq!(0, dedup_iter.metrics.num_unselected_rows); @@ -960,7 +1002,7 @@ mod tests { // Test with filter_deleted = false let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, false)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&input, &result); assert_eq!(0, dedup_iter.metrics.num_unselected_rows); @@ -1059,7 +1101,7 @@ mod tests { ]; let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&expected_filter_deleted, &result); assert_eq!(6, dedup_iter.metrics.num_unselected_rows); @@ -1105,7 +1147,7 @@ mod tests { ]; let iter = input.clone().into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, false)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&expected_no_filter, &result); assert_eq!(4, dedup_iter.metrics.num_unselected_rows); @@ -1156,7 +1198,7 @@ mod tests { ]; let iter = input.into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&expected, &result); assert_eq!(2, dedup_iter.metrics.num_unselected_rows); @@ -1214,7 +1256,7 @@ mod tests { ]; let iter = input.into_iter().map(Ok); - let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true)); + let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true)); let result = collect_iterator_results(&mut dedup_iter); check_record_batches_equal(&expected, &result); assert_eq!(1, dedup_iter.metrics.num_unselected_rows); diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 10238a7c75..109aef108e 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -16,17 +16,20 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; use std::sync::Arc; +use async_stream::try_stream; use datatypes::arrow::array::{Int64Array, UInt64Array}; use datatypes::arrow::compute::interleave; use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow_array::BinaryArray; use datatypes::timestamp::timestamp_array_to_primitive; +use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::SequenceNumber; use crate::error::{ComputeArrowSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; +use crate::read::BoxedRecordBatchStream; use crate::sst::parquet::flat_format::{ primary_key_column_index, sequence_column_index, time_index_column_index, }; @@ -422,7 +425,7 @@ impl Ord for RowCursor { /// Iterator to merge multiple sorted iterators into a single sorted iterator. /// /// All iterators must be sorted by primary key, time index, sequence desc. -pub struct MergeIterator { +pub struct FlatMergeIterator { /// The merge algorithm to maintain heaps. algo: MergeAlgo, /// Current buffered rows to output. @@ -435,7 +438,7 @@ pub struct MergeIterator { batch_size: usize, } -impl MergeIterator { +impl FlatMergeIterator { /// Creates a new iterator to merge sorted `iters`. pub fn new( schema: SchemaRef, @@ -537,12 +540,147 @@ impl MergeIterator { } } +impl Iterator for FlatMergeIterator { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_batch().transpose() + } +} + +/// Iterator to merge multiple sorted iterators into a single sorted iterator. +/// +/// All iterators must be sorted by primary key, time index, sequence desc. +pub struct MergeReader { + /// The merge algorithm to maintain heaps. + algo: MergeAlgo, + /// Current buffered rows to output. + in_progress: BatchBuilder, + /// Non-empty batch to output. + output_batch: Option, + /// Batch size to merge rows. + /// This is not a hard limit, the iterator may return smaller batches to avoid concatenating + /// rows. + batch_size: usize, +} + +impl MergeReader { + /// Creates a new iterator to merge sorted `iters`. + pub async fn new( + schema: SchemaRef, + iters: Vec, + batch_size: usize, + ) -> Result { + let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size); + let mut nodes = Vec::with_capacity(iters.len()); + // Initialize nodes and the buffer. + for (node_index, iter) in iters.into_iter().enumerate() { + let mut node = StreamNode { + node_index, + iter, + cursor: None, + }; + if let Some(batch) = node.advance_batch().await? { + in_progress.push_batch(node_index, batch); + nodes.push(node); + } + } + + let algo = MergeAlgo::new(nodes); + + Ok(Self { + algo, + in_progress, + output_batch: None, + batch_size, + }) + } + + /// Fetches next sorted batch. + pub async fn next_batch(&mut self) -> Result> { + while self.algo.has_rows() && self.output_batch.is_none() { + if self.algo.can_fetch_batch() && !self.in_progress.is_empty() { + // Only one batch in the hot heap, but we have pending rows, output the pending rows first. + self.output_batch = self.in_progress.build_record_batch()?; + debug_assert!(self.output_batch.is_some()); + } else if self.algo.can_fetch_batch() { + self.fetch_batch_from_hottest().await?; + } else { + self.fetch_row_from_hottest().await?; + } + } + + if let Some(batch) = self.output_batch.take() { + Ok(Some(batch)) + } else { + // No more batches. + Ok(None) + } + } + + /// Converts the reader into a stream. + pub fn into_stream(mut self) -> impl Stream> { + try_stream! { + while let Some(batch) = self.next_batch().await? { + yield batch; + } + } + } + + /// Fetches a batch from the hottest node. + async fn fetch_batch_from_hottest(&mut self) -> Result<()> { + debug_assert!(self.in_progress.is_empty()); + + // Safety: next_batch() ensures the heap is not empty. + let mut hottest = self.algo.pop_hot().unwrap(); + debug_assert!(!hottest.current_cursor().is_finished()); + let next = hottest.advance_batch().await?; + // The node is the heap is not empty, so it must have existing rows in the builder. + let batch = self + .in_progress + .take_remaining_rows(hottest.node_index, next); + Self::maybe_output_batch(batch, &mut self.output_batch); + self.algo.reheap(hottest); + + Ok(()) + } + + /// Fetches a row from the hottest node. + async fn fetch_row_from_hottest(&mut self) -> Result<()> { + // Safety: next_batch() ensures the heap has more than 1 element. + let mut hottest = self.algo.pop_hot().unwrap(); + debug_assert!(!hottest.current_cursor().is_finished()); + self.in_progress.push_row(hottest.node_index); + if self.in_progress.len() >= self.batch_size { + // We buffered enough rows. + if let Some(output) = self.in_progress.build_record_batch()? { + Self::maybe_output_batch(output, &mut self.output_batch); + } + } + + if let Some(next) = hottest.advance_row().await? { + self.in_progress.push_batch(hottest.node_index, next); + } + + self.algo.reheap(hottest); + Ok(()) + } + + /// Adds the batch to the output batch if it is not empty. + fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option) { + debug_assert!(output_batch.is_none()); + if batch.num_rows() > 0 { + *output_batch = Some(batch); + } + } +} + /// A sync node in the merge iterator. -struct IterNode { +struct GenericNode { /// Index of the node. node_index: usize, /// Iterator of this `Node`. - iter: BoxedRecordBatchIterator, + iter: T, /// Current batch to be read. The node should ensure the batch is not empty (The /// cursor is not finished). /// @@ -550,7 +688,53 @@ struct IterNode { cursor: Option, } -impl IterNode { +impl NodeCmp for GenericNode { + fn is_eof(&self) -> bool { + self.cursor.is_none() + } + + fn is_behind(&self, other: &Self) -> bool { + debug_assert!(!self.current_cursor().is_finished()); + debug_assert!(!other.current_cursor().is_finished()); + + // We only compare pk and timestamp so nodes in the cold + // heap don't have overlapping timestamps with the hottest node + // in the hot heap. + self.current_cursor() + .first_primary_key() + .cmp(other.current_cursor().last_primary_key()) + .then_with(|| { + self.current_cursor() + .first_timestamp() + .cmp(&other.current_cursor().last_timestamp()) + }) + == Ordering::Greater + } +} + +impl PartialEq for GenericNode { + fn eq(&self, other: &GenericNode) -> bool { + self.cursor == other.cursor + } +} + +impl Eq for GenericNode {} + +impl PartialOrd for GenericNode { + fn partial_cmp(&self, other: &GenericNode) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for GenericNode { + fn cmp(&self, other: &GenericNode) -> Ordering { + // The std binary heap is a max heap, but we want the nodes are ordered in + // ascend order, so we compare the nodes in reverse order. + other.cursor.cmp(&self.cursor) + } +} + +impl GenericNode { /// Returns current cursor. /// /// # Panics @@ -558,7 +742,9 @@ impl IterNode { fn current_cursor(&self) -> &RowCursor { self.cursor.as_ref().unwrap() } +} +impl GenericNode { /// Fetches a new batch from the iter and updates the cursor. /// It advances the current batch. /// Returns the fetched new batch. @@ -594,49 +780,42 @@ impl IterNode { } } -impl NodeCmp for IterNode { - fn is_eof(&self) -> bool { - self.cursor.is_none() +type StreamNode = GenericNode; +type IterNode = GenericNode; + +impl GenericNode { + /// Fetches a new batch from the iter and updates the cursor. + /// It advances the current batch. + /// Returns the fetched new batch. + async fn advance_batch(&mut self) -> Result> { + let batch = self.advance_inner_iter().await?; + let columns = batch.as_ref().map(SortColumns::new); + self.cursor = columns.map(RowCursor::new); + + Ok(batch) } - fn is_behind(&self, other: &Self) -> bool { - debug_assert!(!self.current_cursor().is_finished()); - debug_assert!(!other.current_cursor().is_finished()); + /// Skips one row. + /// Returns the next batch if the current batch is finished. + async fn advance_row(&mut self) -> Result> { + let cursor = self.cursor.as_mut().unwrap(); + cursor.advance(); + if !cursor.is_finished() { + return Ok(None); + } - // We only compare pk and timestamp so nodes in the cold - // heap don't have overlapping timestamps with the hottest node - // in the hot heap. - self.current_cursor() - .first_primary_key() - .cmp(other.current_cursor().last_primary_key()) - .then_with(|| { - self.current_cursor() - .first_timestamp() - .cmp(&other.current_cursor().last_timestamp()) - }) - == Ordering::Greater + // Finished current batch, need to fetch a new batch. + self.advance_batch().await } -} -impl PartialEq for IterNode { - fn eq(&self, other: &IterNode) -> bool { - self.cursor == other.cursor - } -} - -impl Eq for IterNode {} - -impl PartialOrd for IterNode { - fn partial_cmp(&self, other: &IterNode) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for IterNode { - fn cmp(&self, other: &IterNode) -> Ordering { - // The std binary heap is a max heap, but we want the nodes are ordered in - // ascend order, so we compare the nodes in reverse order. - other.cursor.cmp(&self.cursor) + /// Fetches a non-empty batch from the iter. + async fn advance_inner_iter(&mut self) -> Result> { + while let Some(batch) = self.iter.try_next().await? { + if batch.num_rows() > 0 { + return Ok(Some(batch)); + } + } + Ok(None) } } @@ -711,15 +890,9 @@ mod tests { } } - /// Helper function to collect all batches from a MergeIterator. - fn collect_merge_iterator_batches( - mut iter: MergeIterator, - ) -> crate::error::Result> { - let mut batches = Vec::new(); - while let Some(batch) = iter.next_batch()? { - batches.push(batch); - } - Ok(batches) + /// Helper function to collect all batches from a FlatMergeIterator. + fn collect_merge_iterator_batches(iter: FlatMergeIterator) -> Vec { + iter.map(|result| result.unwrap()).collect() } #[test] @@ -740,7 +913,7 @@ mod tests { Field::new("__op_type", DataType::UInt8, false), ])); - let mut merge_iter = MergeIterator::new(schema, vec![], 1024).unwrap(); + let mut merge_iter = FlatMergeIterator::new(schema, vec![], 1024).unwrap(); assert!(merge_iter.next_batch().unwrap().is_none()); } @@ -757,8 +930,8 @@ mod tests { let schema = batch.schema(); let iter = Box::new(new_test_iter(vec![batch.clone()])); - let merge_iter = MergeIterator::new(schema, vec![iter], 1024).unwrap(); - let result = collect_merge_iterator_batches(merge_iter).unwrap(); + let merge_iter = FlatMergeIterator::new(schema, vec![iter], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter); assert_eq!(result.len(), 1); assert_record_batches_eq(&[batch], &result); @@ -792,8 +965,8 @@ mod tests { let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()])); let iter2 = Box::new(new_test_iter(vec![batch2.clone()])); - let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); - let result = collect_merge_iterator_batches(merge_iter).unwrap(); + let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter); // Results should be sorted by primary key, timestamp, sequence desc let expected = vec![batch1, batch2, batch3]; @@ -822,8 +995,8 @@ mod tests { let iter1 = Box::new(new_test_iter(vec![batch1])); let iter2 = Box::new(new_test_iter(vec![batch2])); - let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); - let result = collect_merge_iterator_batches(merge_iter).unwrap(); + let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter); let expected = vec![ create_test_record_batch( @@ -861,8 +1034,8 @@ mod tests { let iter1 = Box::new(new_test_iter(vec![batch1])); let iter2 = Box::new(new_test_iter(vec![batch2])); - let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); - let result = collect_merge_iterator_batches(merge_iter).unwrap(); + let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap(); + let result = collect_merge_iterator_batches(merge_iter); // Should be sorted by sequence descending for same key/timestamp let expected = vec![ diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index ddf8271013..86d7014521 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -126,7 +126,7 @@ impl FlatProjectionMapper { }) .collect(); - let batch_schema = plain_projected_columns(metadata, &format_projection); + let batch_schema = flat_projected_columns(metadata, &format_projection); Ok(FlatProjectionMapper { metadata: metadata.clone(), @@ -162,7 +162,7 @@ impl FlatProjectionMapper { /// Returns the schema of converted [RecordBatch]. /// This is the schema that the stream will output. This schema may contain - /// less columns than [ProjectionMapper::column_ids()]. + /// less columns than [FlatProjectionMapper::column_ids()]. pub(crate) fn output_schema(&self) -> SchemaRef { self.output_schema.clone() } @@ -198,7 +198,7 @@ impl FlatProjectionMapper { } /// Returns ids and datatypes of columns of the output batch after applying the `projection`. -pub(crate) fn plain_projected_columns( +pub(crate) fn flat_projected_columns( metadata: &RegionMetadata, format_projection: &FormatProjection, ) -> Vec<(ColumnId, ConcreteDataType)> { diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 1f8bb0ed20..447e59f1cd 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -695,7 +695,7 @@ mod tests { } #[test] - fn test_plain_projection_mapper_all() { + fn test_flat_projection_mapper_all() { let metadata = Arc::new( TestRegionMetadataBuilder::default() .num_tags(2) @@ -729,7 +729,7 @@ mod tests { } #[test] - fn test_plain_projection_mapper_with_projection() { + fn test_flat_projection_mapper_with_projection() { let metadata = Arc::new( TestRegionMetadataBuilder::default() .num_tags(2) @@ -761,7 +761,7 @@ mod tests { } #[test] - fn test_plain_projection_mapper_empty_projection() { + fn test_flat_projection_mapper_empty_projection() { let metadata = Arc::new( TestRegionMetadataBuilder::default() .num_tags(2) @@ -772,11 +772,11 @@ mod tests { let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap(); assert_eq!([0], mapper.column_ids()); // Should still read the time index column assert!(mapper.output_schema().is_empty()); - let plain_mapper = mapper.as_flat().unwrap(); - assert!(plain_mapper.batch_schema().is_empty()); + let flat_mapper = mapper.as_flat().unwrap(); + assert!(flat_mapper.batch_schema().is_empty()); let batch = new_flat_batch(Some(0), &[], &[], 3); - let record_batch = plain_mapper.convert(&batch).unwrap(); + let record_batch = flat_mapper.convert(&batch).unwrap(); assert_eq!(3, record_batch.num_rows()); assert_eq!(0, record_batch.num_columns()); assert!(record_batch.schema.is_empty());