mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
feat: Implements async FlatMergeReader and FlatDedupReader (#6761)
* refactor: Add Flat prefix to MergeIterator and DedupIterator Signed-off-by: evenyag <realevenyag@gmail.com> * feat: implement MergeReader for RecordBatch Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: use GenericNode for IterNode Signed-off-by: evenyag <realevenyag@gmail.com> * feat: flat merge reader to stream Signed-off-by: evenyag <realevenyag@gmail.com> * feat: implement FlatDedupReader Signed-off-by: evenyag <realevenyag@gmail.com> * chore: add a benchmark for FlatMergeIterator Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: rename plain_projection to flat_projection Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -27,6 +27,7 @@ use mito2::memtable::bulk::part_reader::BulkPartRecordBatchIter;
|
||||
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
|
||||
use mito2::memtable::time_series::TimeSeriesMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable};
|
||||
use mito2::read::flat_merge::FlatMergeIterator;
|
||||
use mito2::region::options::MergeMode;
|
||||
use mito2::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
|
||||
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
|
||||
@@ -423,6 +424,70 @@ fn bulk_part_converter(c: &mut Criterion) {
|
||||
}
|
||||
}
|
||||
|
||||
fn flat_merge_iterator_bench(c: &mut Criterion) {
|
||||
let metadata = Arc::new(cpu_metadata());
|
||||
let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
let start_sec = 1710043200;
|
||||
|
||||
let mut group = c.benchmark_group("flat_merge_iterator");
|
||||
group.sample_size(10);
|
||||
|
||||
for &num_parts in &[8, 16, 32, 64, 128, 256, 512] {
|
||||
// Pre-create BulkParts with different timestamps but same hosts (1024)
|
||||
let mut bulk_parts = Vec::with_capacity(num_parts);
|
||||
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
|
||||
|
||||
for part_idx in 0..num_parts {
|
||||
let generator = CpuDataGenerator::new(
|
||||
metadata.clone(),
|
||||
1024, // 1024 hosts per part
|
||||
start_sec + part_idx as i64 * 10, // Different timestamps for each part
|
||||
start_sec + part_idx as i64 * 10 + 1,
|
||||
);
|
||||
|
||||
let mut converter =
|
||||
BulkPartConverter::new(&metadata, schema.clone(), 1024, codec.clone(), true);
|
||||
if let Some(kvs) = generator.iter().next() {
|
||||
converter.append_key_values(&kvs).unwrap();
|
||||
}
|
||||
let bulk_part = converter.convert().unwrap();
|
||||
bulk_parts.push(bulk_part);
|
||||
}
|
||||
|
||||
// Pre-create BulkIterContext
|
||||
let context = Arc::new(BulkIterContext::new(
|
||||
metadata.clone(),
|
||||
&None, // No projection
|
||||
None, // No predicate
|
||||
));
|
||||
|
||||
group.bench_with_input(
|
||||
format!("{}_parts_1024_hosts", num_parts),
|
||||
&num_parts,
|
||||
|b, _| {
|
||||
b.iter(|| {
|
||||
// Create iterators from BulkParts
|
||||
let mut iters = Vec::with_capacity(num_parts);
|
||||
for bulk_part in &bulk_parts {
|
||||
let iter = BulkPartRecordBatchIter::new(
|
||||
bulk_part.batch.clone(),
|
||||
context.clone(),
|
||||
None, // No sequence filter
|
||||
);
|
||||
iters.push(Box::new(iter) as _);
|
||||
}
|
||||
|
||||
// Create and consume FlatMergeIterator
|
||||
let merge_iter = FlatMergeIterator::new(schema.clone(), iters, 1024).unwrap();
|
||||
for batch_result in merge_iter {
|
||||
let _batch = batch_result.unwrap();
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
|
||||
let metadata = Arc::new(cpu_metadata());
|
||||
let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
@@ -498,6 +563,7 @@ criterion_group!(
|
||||
full_scan,
|
||||
filter_1_host,
|
||||
bulk_part_converter,
|
||||
bulk_part_record_batch_iter_filter
|
||||
bulk_part_record_batch_iter_filter,
|
||||
flat_merge_iterator_bench
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -43,6 +43,7 @@ use datafusion_common::arrow::array::UInt8Array;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
|
||||
use datatypes::arrow::compute::SortOptions;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::arrow::row::{RowConverter, SortField};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
@@ -1014,6 +1015,9 @@ pub type BoxedBatchReader = Box<dyn BatchReader>;
|
||||
/// Pointer to a stream that yields [Batch].
|
||||
pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
|
||||
|
||||
/// Pointer to a stream that yields [RecordBatch].
|
||||
pub type BoxedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use api::v1::OpType;
|
||||
use async_stream::try_stream;
|
||||
use datatypes::arrow::array::{
|
||||
make_comparator, Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt64Array,
|
||||
UInt8Array,
|
||||
@@ -30,6 +31,7 @@ use datatypes::arrow::compute::{
|
||||
};
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
|
||||
@@ -41,13 +43,13 @@ use crate::sst::parquet::flat_format::{
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, FIXED_POS_COLUMN_NUM};
|
||||
|
||||
/// An iterator to dedup sorted batches from an iterator based on the dedup strategy.
|
||||
pub struct DedupIterator<I, S> {
|
||||
pub struct FlatDedupIterator<I, S> {
|
||||
iter: I,
|
||||
strategy: S,
|
||||
metrics: DedupMetrics,
|
||||
}
|
||||
|
||||
impl<I, S> DedupIterator<I, S> {
|
||||
impl<I, S> FlatDedupIterator<I, S> {
|
||||
/// Creates a new dedup iterator.
|
||||
pub fn new(iter: I, strategy: S) -> Self {
|
||||
Self {
|
||||
@@ -58,7 +60,7 @@ impl<I, S> DedupIterator<I, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> DedupIterator<I, S> {
|
||||
impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> FlatDedupIterator<I, S> {
|
||||
/// Returns the next deduplicated batch.
|
||||
fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
while let Some(batch) = self.iter.next().transpose()? {
|
||||
@@ -72,7 +74,7 @@ impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Dedup
|
||||
}
|
||||
|
||||
impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Iterator
|
||||
for DedupIterator<I, S>
|
||||
for FlatDedupIterator<I, S>
|
||||
{
|
||||
type Item = Result<RecordBatch>;
|
||||
|
||||
@@ -81,6 +83,46 @@ impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Itera
|
||||
}
|
||||
}
|
||||
|
||||
/// An async reader to dedup sorted record batches from a stream based on the dedup strategy.
|
||||
pub struct DedupReader<I, S> {
|
||||
stream: I,
|
||||
strategy: S,
|
||||
metrics: DedupMetrics,
|
||||
}
|
||||
|
||||
impl<I, S> DedupReader<I, S> {
|
||||
/// Creates a new dedup iterator.
|
||||
pub fn new(stream: I, strategy: S) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
strategy,
|
||||
metrics: DedupMetrics::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy> DedupReader<I, S> {
|
||||
/// Returns the next deduplicated batch.
|
||||
async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
while let Some(batch) = self.stream.try_next().await? {
|
||||
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
}
|
||||
|
||||
self.strategy.finish(&mut self.metrics)
|
||||
}
|
||||
|
||||
/// Converts the reader into a stream.
|
||||
pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
|
||||
try_stream! {
|
||||
while let Some(batch) = self.fetch_next_batch().await? {
|
||||
yield batch;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Strategy to remove duplicate rows from sorted record batches.
|
||||
pub trait RecordBatchDedupStrategy: Send {
|
||||
/// Pushes a batch to the dedup strategy.
|
||||
@@ -826,7 +868,7 @@ mod tests {
|
||||
|
||||
// Test with filter_deleted = true
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(true));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&input, &result);
|
||||
assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -834,7 +876,7 @@ mod tests {
|
||||
|
||||
// Test with filter_deleted = false
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(false));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&input, &result);
|
||||
assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -890,7 +932,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(true));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(true));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&expected_filter_deleted, &result);
|
||||
assert_eq!(5, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -923,7 +965,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastRow::new(false));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastRow::new(false));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&expected_no_filter, &result);
|
||||
assert_eq!(3, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -952,7 +994,7 @@ mod tests {
|
||||
|
||||
// Test with filter_deleted = true
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&input, &result);
|
||||
assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -960,7 +1002,7 @@ mod tests {
|
||||
|
||||
// Test with filter_deleted = false
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, false));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&input, &result);
|
||||
assert_eq!(0, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -1059,7 +1101,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&expected_filter_deleted, &result);
|
||||
assert_eq!(6, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -1105,7 +1147,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, false));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, false));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&expected_no_filter, &result);
|
||||
assert_eq!(4, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -1156,7 +1198,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&expected, &result);
|
||||
assert_eq!(2, dedup_iter.metrics.num_unselected_rows);
|
||||
@@ -1214,7 +1256,7 @@ mod tests {
|
||||
];
|
||||
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let mut dedup_iter = DedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let mut dedup_iter = FlatDedupIterator::new(iter, FlatLastNonNull::new(1, true));
|
||||
let result = collect_iterator_results(&mut dedup_iter);
|
||||
check_record_batches_equal(&expected, &result);
|
||||
assert_eq!(1, dedup_iter.metrics.num_unselected_rows);
|
||||
|
||||
@@ -16,17 +16,20 @@ use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use datatypes::arrow::array::{Int64Array, UInt64Array};
|
||||
use datatypes::arrow::compute::interleave;
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::arrow_array::BinaryArray;
|
||||
use datatypes::timestamp::timestamp_array_to_primitive;
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, Result};
|
||||
use crate::memtable::BoxedRecordBatchIterator;
|
||||
use crate::read::BoxedRecordBatchStream;
|
||||
use crate::sst::parquet::flat_format::{
|
||||
primary_key_column_index, sequence_column_index, time_index_column_index,
|
||||
};
|
||||
@@ -422,7 +425,7 @@ impl Ord for RowCursor {
|
||||
/// Iterator to merge multiple sorted iterators into a single sorted iterator.
|
||||
///
|
||||
/// All iterators must be sorted by primary key, time index, sequence desc.
|
||||
pub struct MergeIterator {
|
||||
pub struct FlatMergeIterator {
|
||||
/// The merge algorithm to maintain heaps.
|
||||
algo: MergeAlgo<IterNode>,
|
||||
/// Current buffered rows to output.
|
||||
@@ -435,7 +438,7 @@ pub struct MergeIterator {
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
impl MergeIterator {
|
||||
impl FlatMergeIterator {
|
||||
/// Creates a new iterator to merge sorted `iters`.
|
||||
pub fn new(
|
||||
schema: SchemaRef,
|
||||
@@ -537,12 +540,147 @@ impl MergeIterator {
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for FlatMergeIterator {
|
||||
type Item = Result<RecordBatch>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.next_batch().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator to merge multiple sorted iterators into a single sorted iterator.
|
||||
///
|
||||
/// All iterators must be sorted by primary key, time index, sequence desc.
|
||||
pub struct MergeReader {
|
||||
/// The merge algorithm to maintain heaps.
|
||||
algo: MergeAlgo<StreamNode>,
|
||||
/// Current buffered rows to output.
|
||||
in_progress: BatchBuilder,
|
||||
/// Non-empty batch to output.
|
||||
output_batch: Option<RecordBatch>,
|
||||
/// Batch size to merge rows.
|
||||
/// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
|
||||
/// rows.
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
impl MergeReader {
|
||||
/// Creates a new iterator to merge sorted `iters`.
|
||||
pub async fn new(
|
||||
schema: SchemaRef,
|
||||
iters: Vec<BoxedRecordBatchStream>,
|
||||
batch_size: usize,
|
||||
) -> Result<Self> {
|
||||
let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
|
||||
let mut nodes = Vec::with_capacity(iters.len());
|
||||
// Initialize nodes and the buffer.
|
||||
for (node_index, iter) in iters.into_iter().enumerate() {
|
||||
let mut node = StreamNode {
|
||||
node_index,
|
||||
iter,
|
||||
cursor: None,
|
||||
};
|
||||
if let Some(batch) = node.advance_batch().await? {
|
||||
in_progress.push_batch(node_index, batch);
|
||||
nodes.push(node);
|
||||
}
|
||||
}
|
||||
|
||||
let algo = MergeAlgo::new(nodes);
|
||||
|
||||
Ok(Self {
|
||||
algo,
|
||||
in_progress,
|
||||
output_batch: None,
|
||||
batch_size,
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetches next sorted batch.
|
||||
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
while self.algo.has_rows() && self.output_batch.is_none() {
|
||||
if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
|
||||
// Only one batch in the hot heap, but we have pending rows, output the pending rows first.
|
||||
self.output_batch = self.in_progress.build_record_batch()?;
|
||||
debug_assert!(self.output_batch.is_some());
|
||||
} else if self.algo.can_fetch_batch() {
|
||||
self.fetch_batch_from_hottest().await?;
|
||||
} else {
|
||||
self.fetch_row_from_hottest().await?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(batch) = self.output_batch.take() {
|
||||
Ok(Some(batch))
|
||||
} else {
|
||||
// No more batches.
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the reader into a stream.
|
||||
pub fn into_stream(mut self) -> impl Stream<Item = Result<RecordBatch>> {
|
||||
try_stream! {
|
||||
while let Some(batch) = self.next_batch().await? {
|
||||
yield batch;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetches a batch from the hottest node.
|
||||
async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
|
||||
debug_assert!(self.in_progress.is_empty());
|
||||
|
||||
// Safety: next_batch() ensures the heap is not empty.
|
||||
let mut hottest = self.algo.pop_hot().unwrap();
|
||||
debug_assert!(!hottest.current_cursor().is_finished());
|
||||
let next = hottest.advance_batch().await?;
|
||||
// The node is the heap is not empty, so it must have existing rows in the builder.
|
||||
let batch = self
|
||||
.in_progress
|
||||
.take_remaining_rows(hottest.node_index, next);
|
||||
Self::maybe_output_batch(batch, &mut self.output_batch);
|
||||
self.algo.reheap(hottest);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetches a row from the hottest node.
|
||||
async fn fetch_row_from_hottest(&mut self) -> Result<()> {
|
||||
// Safety: next_batch() ensures the heap has more than 1 element.
|
||||
let mut hottest = self.algo.pop_hot().unwrap();
|
||||
debug_assert!(!hottest.current_cursor().is_finished());
|
||||
self.in_progress.push_row(hottest.node_index);
|
||||
if self.in_progress.len() >= self.batch_size {
|
||||
// We buffered enough rows.
|
||||
if let Some(output) = self.in_progress.build_record_batch()? {
|
||||
Self::maybe_output_batch(output, &mut self.output_batch);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(next) = hottest.advance_row().await? {
|
||||
self.in_progress.push_batch(hottest.node_index, next);
|
||||
}
|
||||
|
||||
self.algo.reheap(hottest);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Adds the batch to the output batch if it is not empty.
|
||||
fn maybe_output_batch(batch: RecordBatch, output_batch: &mut Option<RecordBatch>) {
|
||||
debug_assert!(output_batch.is_none());
|
||||
if batch.num_rows() > 0 {
|
||||
*output_batch = Some(batch);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A sync node in the merge iterator.
|
||||
struct IterNode {
|
||||
struct GenericNode<T> {
|
||||
/// Index of the node.
|
||||
node_index: usize,
|
||||
/// Iterator of this `Node`.
|
||||
iter: BoxedRecordBatchIterator,
|
||||
iter: T,
|
||||
/// Current batch to be read. The node should ensure the batch is not empty (The
|
||||
/// cursor is not finished).
|
||||
///
|
||||
@@ -550,7 +688,53 @@ struct IterNode {
|
||||
cursor: Option<RowCursor>,
|
||||
}
|
||||
|
||||
impl IterNode {
|
||||
impl<T> NodeCmp for GenericNode<T> {
|
||||
fn is_eof(&self) -> bool {
|
||||
self.cursor.is_none()
|
||||
}
|
||||
|
||||
fn is_behind(&self, other: &Self) -> bool {
|
||||
debug_assert!(!self.current_cursor().is_finished());
|
||||
debug_assert!(!other.current_cursor().is_finished());
|
||||
|
||||
// We only compare pk and timestamp so nodes in the cold
|
||||
// heap don't have overlapping timestamps with the hottest node
|
||||
// in the hot heap.
|
||||
self.current_cursor()
|
||||
.first_primary_key()
|
||||
.cmp(other.current_cursor().last_primary_key())
|
||||
.then_with(|| {
|
||||
self.current_cursor()
|
||||
.first_timestamp()
|
||||
.cmp(&other.current_cursor().last_timestamp())
|
||||
})
|
||||
== Ordering::Greater
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> PartialEq for GenericNode<T> {
|
||||
fn eq(&self, other: &GenericNode<T>) -> bool {
|
||||
self.cursor == other.cursor
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Eq for GenericNode<T> {}
|
||||
|
||||
impl<T> PartialOrd for GenericNode<T> {
|
||||
fn partial_cmp(&self, other: &GenericNode<T>) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Ord for GenericNode<T> {
|
||||
fn cmp(&self, other: &GenericNode<T>) -> Ordering {
|
||||
// The std binary heap is a max heap, but we want the nodes are ordered in
|
||||
// ascend order, so we compare the nodes in reverse order.
|
||||
other.cursor.cmp(&self.cursor)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> GenericNode<T> {
|
||||
/// Returns current cursor.
|
||||
///
|
||||
/// # Panics
|
||||
@@ -558,7 +742,9 @@ impl IterNode {
|
||||
fn current_cursor(&self) -> &RowCursor {
|
||||
self.cursor.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericNode<BoxedRecordBatchIterator> {
|
||||
/// Fetches a new batch from the iter and updates the cursor.
|
||||
/// It advances the current batch.
|
||||
/// Returns the fetched new batch.
|
||||
@@ -594,49 +780,42 @@ impl IterNode {
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeCmp for IterNode {
|
||||
fn is_eof(&self) -> bool {
|
||||
self.cursor.is_none()
|
||||
type StreamNode = GenericNode<BoxedRecordBatchStream>;
|
||||
type IterNode = GenericNode<BoxedRecordBatchIterator>;
|
||||
|
||||
impl GenericNode<BoxedRecordBatchStream> {
|
||||
/// Fetches a new batch from the iter and updates the cursor.
|
||||
/// It advances the current batch.
|
||||
/// Returns the fetched new batch.
|
||||
async fn advance_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
let batch = self.advance_inner_iter().await?;
|
||||
let columns = batch.as_ref().map(SortColumns::new);
|
||||
self.cursor = columns.map(RowCursor::new);
|
||||
|
||||
Ok(batch)
|
||||
}
|
||||
|
||||
fn is_behind(&self, other: &Self) -> bool {
|
||||
debug_assert!(!self.current_cursor().is_finished());
|
||||
debug_assert!(!other.current_cursor().is_finished());
|
||||
/// Skips one row.
|
||||
/// Returns the next batch if the current batch is finished.
|
||||
async fn advance_row(&mut self) -> Result<Option<RecordBatch>> {
|
||||
let cursor = self.cursor.as_mut().unwrap();
|
||||
cursor.advance();
|
||||
if !cursor.is_finished() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// We only compare pk and timestamp so nodes in the cold
|
||||
// heap don't have overlapping timestamps with the hottest node
|
||||
// in the hot heap.
|
||||
self.current_cursor()
|
||||
.first_primary_key()
|
||||
.cmp(other.current_cursor().last_primary_key())
|
||||
.then_with(|| {
|
||||
self.current_cursor()
|
||||
.first_timestamp()
|
||||
.cmp(&other.current_cursor().last_timestamp())
|
||||
})
|
||||
== Ordering::Greater
|
||||
// Finished current batch, need to fetch a new batch.
|
||||
self.advance_batch().await
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for IterNode {
|
||||
fn eq(&self, other: &IterNode) -> bool {
|
||||
self.cursor == other.cursor
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for IterNode {}
|
||||
|
||||
impl PartialOrd for IterNode {
|
||||
fn partial_cmp(&self, other: &IterNode) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for IterNode {
|
||||
fn cmp(&self, other: &IterNode) -> Ordering {
|
||||
// The std binary heap is a max heap, but we want the nodes are ordered in
|
||||
// ascend order, so we compare the nodes in reverse order.
|
||||
other.cursor.cmp(&self.cursor)
|
||||
/// Fetches a non-empty batch from the iter.
|
||||
async fn advance_inner_iter(&mut self) -> Result<Option<RecordBatch>> {
|
||||
while let Some(batch) = self.iter.try_next().await? {
|
||||
if batch.num_rows() > 0 {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -711,15 +890,9 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to collect all batches from a MergeIterator.
|
||||
fn collect_merge_iterator_batches(
|
||||
mut iter: MergeIterator,
|
||||
) -> crate::error::Result<Vec<RecordBatch>> {
|
||||
let mut batches = Vec::new();
|
||||
while let Some(batch) = iter.next_batch()? {
|
||||
batches.push(batch);
|
||||
}
|
||||
Ok(batches)
|
||||
/// Helper function to collect all batches from a FlatMergeIterator.
|
||||
fn collect_merge_iterator_batches(iter: FlatMergeIterator) -> Vec<RecordBatch> {
|
||||
iter.map(|result| result.unwrap()).collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -740,7 +913,7 @@ mod tests {
|
||||
Field::new("__op_type", DataType::UInt8, false),
|
||||
]));
|
||||
|
||||
let mut merge_iter = MergeIterator::new(schema, vec![], 1024).unwrap();
|
||||
let mut merge_iter = FlatMergeIterator::new(schema, vec![], 1024).unwrap();
|
||||
assert!(merge_iter.next_batch().unwrap().is_none());
|
||||
}
|
||||
|
||||
@@ -757,8 +930,8 @@ mod tests {
|
||||
let schema = batch.schema();
|
||||
let iter = Box::new(new_test_iter(vec![batch.clone()]));
|
||||
|
||||
let merge_iter = MergeIterator::new(schema, vec![iter], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter).unwrap();
|
||||
let merge_iter = FlatMergeIterator::new(schema, vec![iter], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter);
|
||||
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_record_batches_eq(&[batch], &result);
|
||||
@@ -792,8 +965,8 @@ mod tests {
|
||||
let iter1 = Box::new(new_test_iter(vec![batch1.clone(), batch3.clone()]));
|
||||
let iter2 = Box::new(new_test_iter(vec![batch2.clone()]));
|
||||
|
||||
let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter).unwrap();
|
||||
let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter);
|
||||
|
||||
// Results should be sorted by primary key, timestamp, sequence desc
|
||||
let expected = vec![batch1, batch2, batch3];
|
||||
@@ -822,8 +995,8 @@ mod tests {
|
||||
let iter1 = Box::new(new_test_iter(vec![batch1]));
|
||||
let iter2 = Box::new(new_test_iter(vec![batch2]));
|
||||
|
||||
let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter).unwrap();
|
||||
let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter);
|
||||
|
||||
let expected = vec![
|
||||
create_test_record_batch(
|
||||
@@ -861,8 +1034,8 @@ mod tests {
|
||||
let iter1 = Box::new(new_test_iter(vec![batch1]));
|
||||
let iter2 = Box::new(new_test_iter(vec![batch2]));
|
||||
|
||||
let merge_iter = MergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter).unwrap();
|
||||
let merge_iter = FlatMergeIterator::new(schema, vec![iter1, iter2], 1024).unwrap();
|
||||
let result = collect_merge_iterator_batches(merge_iter);
|
||||
|
||||
// Should be sorted by sequence descending for same key/timestamp
|
||||
let expected = vec![
|
||||
|
||||
@@ -126,7 +126,7 @@ impl FlatProjectionMapper {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let batch_schema = plain_projected_columns(metadata, &format_projection);
|
||||
let batch_schema = flat_projected_columns(metadata, &format_projection);
|
||||
|
||||
Ok(FlatProjectionMapper {
|
||||
metadata: metadata.clone(),
|
||||
@@ -162,7 +162,7 @@ impl FlatProjectionMapper {
|
||||
|
||||
/// Returns the schema of converted [RecordBatch].
|
||||
/// This is the schema that the stream will output. This schema may contain
|
||||
/// less columns than [ProjectionMapper::column_ids()].
|
||||
/// less columns than [FlatProjectionMapper::column_ids()].
|
||||
pub(crate) fn output_schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
@@ -198,7 +198,7 @@ impl FlatProjectionMapper {
|
||||
}
|
||||
|
||||
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
|
||||
pub(crate) fn plain_projected_columns(
|
||||
pub(crate) fn flat_projected_columns(
|
||||
metadata: &RegionMetadata,
|
||||
format_projection: &FormatProjection,
|
||||
) -> Vec<(ColumnId, ConcreteDataType)> {
|
||||
|
||||
@@ -695,7 +695,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_plain_projection_mapper_all() {
|
||||
fn test_flat_projection_mapper_all() {
|
||||
let metadata = Arc::new(
|
||||
TestRegionMetadataBuilder::default()
|
||||
.num_tags(2)
|
||||
@@ -729,7 +729,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_plain_projection_mapper_with_projection() {
|
||||
fn test_flat_projection_mapper_with_projection() {
|
||||
let metadata = Arc::new(
|
||||
TestRegionMetadataBuilder::default()
|
||||
.num_tags(2)
|
||||
@@ -761,7 +761,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_plain_projection_mapper_empty_projection() {
|
||||
fn test_flat_projection_mapper_empty_projection() {
|
||||
let metadata = Arc::new(
|
||||
TestRegionMetadataBuilder::default()
|
||||
.num_tags(2)
|
||||
@@ -772,11 +772,11 @@ mod tests {
|
||||
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
|
||||
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
|
||||
assert!(mapper.output_schema().is_empty());
|
||||
let plain_mapper = mapper.as_flat().unwrap();
|
||||
assert!(plain_mapper.batch_schema().is_empty());
|
||||
let flat_mapper = mapper.as_flat().unwrap();
|
||||
assert!(flat_mapper.batch_schema().is_empty());
|
||||
|
||||
let batch = new_flat_batch(Some(0), &[], &[], 3);
|
||||
let record_batch = plain_mapper.convert(&batch).unwrap();
|
||||
let record_batch = flat_mapper.convert(&batch).unwrap();
|
||||
assert_eq!(3, record_batch.num_rows());
|
||||
assert_eq!(0, record_batch.num_columns());
|
||||
assert!(record_batch.schema.is_empty());
|
||||
|
||||
Reference in New Issue
Block a user