From fd94f5519398af27f9fb7119abffa43ac0315e70 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 10 Apr 2026 11:12:33 +0800 Subject: [PATCH] refactor(mito2): remove dead scan code (#7925) * refactor(mito2): remove dead batch parallel scan helpers Signed-off-by: evenyag * refactor(mito2): remove dead merge reader path Signed-off-by: evenyag * refactor(mito2): remove dead batch dedup reader Signed-off-by: evenyag * test(mito2): remove obsolete batch source helper Signed-off-by: evenyag * refactor: remove unused plain batch Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/benches/simple_bulk_memtable.rs | 47 +- .../src/memtable/simple_bulk_memtable.rs | 106 +- src/mito2/src/read.rs | 2 - src/mito2/src/read/dedup.rs | 544 +--------- src/mito2/src/read/flat_merge.rs | 82 +- src/mito2/src/read/merge.rs | 982 ------------------ src/mito2/src/read/plain_batch.rs | 505 --------- src/mito2/src/read/scan_region.rs | 78 +- src/mito2/src/read/scan_util.rs | 2 +- src/mito2/src/sst.rs | 28 - src/mito2/src/test_util/sst_util.rs | 10 +- 11 files changed, 94 insertions(+), 2292 deletions(-) delete mode 100644 src/mito2/src/read/merge.rs delete mode 100644 src/mito2/src/read/plain_batch.rs diff --git a/src/mito2/benches/simple_bulk_memtable.rs b/src/mito2/benches/simple_bulk_memtable.rs index 05035734de..8a199f46f1 100644 --- a/src/mito2/benches/simple_bulk_memtable.rs +++ b/src/mito2/benches/simple_bulk_memtable.rs @@ -21,11 +21,7 @@ use criterion::{Criterion, criterion_group, criterion_main}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable; -use mito2::memtable::{IterBuilder, KeyValues, Memtable, MemtableRanges, RangesOptions}; -use mito2::read; -use mito2::read::Source; -use mito2::read::dedup::DedupReader; -use mito2::read::merge::MergeReaderBuilder; +use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions}; use mito2::region::options::MergeMode; use mito2::test_util::column_metadata_to_column_schema; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; @@ -126,36 +122,6 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable { } async fn flush(mem: &SimpleBulkMemtable) { - let MemtableRanges { ranges, .. } = mem.ranges(None, RangesOptions::for_flush()).unwrap(); - - let mut source = if ranges.len() == 1 { - let only_range = ranges.into_values().next().unwrap(); - let iter = only_range.build_iter().unwrap(); - Source::Iter(iter) - } else { - let sources = ranges - .into_values() - .map(|r| r.build_iter().map(Source::Iter)) - .collect::>>() - .unwrap(); - let merge_reader = MergeReaderBuilder::from_sources(sources) - .build() - .await - .unwrap(); - let reader = Box::new(DedupReader::new( - merge_reader, - read::dedup::LastRow::new(true), - None, - )); - Source::Reader(reader) - }; - - while let Some(b) = source.next_batch().await.unwrap() { - black_box(b); - } -} - -async fn flush_original(mem: &SimpleBulkMemtable) { let iter = mem .ranges(None, RangesOptions::default()) .unwrap() @@ -179,19 +145,10 @@ fn bench_ranges_parallel_vs_sequential(c: &mut Criterion) { let total_rows_k = num_batch * 10; let memtable = create_memtable_with_rows(num_batch); - group.bench_with_input( - BenchmarkId::new("flush_by_merge_reader", format!("{}k_rows", total_rows_k)), - &memtable, - |b, memtable| b.to_async(&rt).iter(|| async { flush(memtable).await }), - ); - group.bench_with_input( BenchmarkId::new("flush_by_iter", format!("{}k_rows", total_rows_k)), &memtable, - |b, memtable| { - b.to_async(&rt) - .iter(|| async { flush_original(memtable).await }) - }, + |b, memtable| b.to_async(&rt).iter(|| async { flush(memtable).await }), ); } diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 1284741347..6ff799ebf5 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -421,10 +421,6 @@ mod tests { use store_api::storage::{RegionId, SequenceNumber, SequenceRange}; use super::*; - use crate::read; - use crate::read::dedup::DedupReader; - use crate::read::merge::MergeReaderBuilder; - use crate::read::{BatchReader, Source}; use crate::region::options::MergeMode; use crate::test_util::column_metadata_to_column_schema; @@ -621,81 +617,6 @@ mod tests { assert_eq!(1, batch.num_rows()); } - #[tokio::test] - async fn test_write_dedup() { - let memtable = new_test_memtable(true, MergeMode::LastRow); - let kvs = build_key_values( - &memtable.region_metadata, - 0, - &[(1, 1.0, "a".to_string())], - OpType::Put, - ); - let kv = kvs.iter().next().unwrap(); - memtable.write_one(kv).unwrap(); - memtable.freeze().unwrap(); - - let kvs = build_key_values( - &memtable.region_metadata, - 1, - &[(1, 1.0, "a".to_string())], - OpType::Delete, - ); - let kv = kvs.iter().next().unwrap(); - memtable.write_one(kv).unwrap(); - - let ranges = memtable.ranges(None, RangesOptions::default()).unwrap(); - let mut source = vec![]; - for r in ranges.ranges.values() { - source.push(Source::Iter(r.build_iter().unwrap())); - } - - let reader = MergeReaderBuilder::from_sources(source) - .build() - .await - .unwrap(); - - let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None); - let mut num_rows = 0; - while let Some(b) = reader.next_batch().await.unwrap() { - num_rows += b.num_rows(); - } - assert_eq!(num_rows, 1); - } - - #[tokio::test] - async fn test_delete_only() { - let memtable = new_test_memtable(true, MergeMode::LastRow); - let kvs = build_key_values( - &memtable.region_metadata, - 0, - &[(1, 1.0, "a".to_string())], - OpType::Delete, - ); - let kv = kvs.iter().next().unwrap(); - memtable.write_one(kv).unwrap(); - memtable.freeze().unwrap(); - - let ranges = memtable.ranges(None, RangesOptions::default()).unwrap(); - let mut source = vec![]; - for r in ranges.ranges.values() { - source.push(Source::Iter(r.build_iter().unwrap())); - } - - let reader = MergeReaderBuilder::from_sources(source) - .build() - .await - .unwrap(); - - let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None); - let mut num_rows = 0; - while let Some(b) = reader.next_batch().await.unwrap() { - num_rows += b.num_rows(); - assert_eq!(b.num_rows(), 1); - assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8); - } - assert_eq!(num_rows, 1); - } - #[tokio::test] async fn test_single_range() { let memtable = new_test_memtable(true, MergeMode::LastRow); @@ -902,8 +823,8 @@ mod tests { .unwrap() } - #[tokio::test] - async fn test_write_read_large_string() { + #[test] + fn test_write_read_large_string() { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); builder .push_column_metadata(ColumnMetadata { @@ -948,25 +869,12 @@ mod tests { .unwrap(); let MemtableRanges { ranges, .. } = memtable.ranges(None, RangesOptions::default()).unwrap(); - let mut source = if ranges.len() == 1 { - let only_range = ranges.into_values().next().unwrap(); - Source::Iter(only_range.build_iter().unwrap()) - } else { - let sources = ranges - .into_values() - .map(|r| r.build_iter().map(Source::Iter)) - .collect::>>() - .unwrap(); - let merge_reader = MergeReaderBuilder::from_sources(sources) - .build() - .await - .unwrap(); - Source::Reader(Box::new(merge_reader)) - }; - let mut rows = 0; - while let Some(b) = source.next_batch().await.unwrap() { - rows += b.num_rows(); + for range in ranges.into_values() { + let iter = range.build_iter().unwrap(); + for batch in iter { + rows += batch.unwrap().num_rows(); + } } assert_eq!(rows, 2); } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index db7dfd1958..90eb9a3da7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -21,8 +21,6 @@ pub mod flat_dedup; pub mod flat_merge; pub mod flat_projection; pub mod last_row; -pub mod merge; -pub mod plain_batch; pub mod projection; pub(crate) mod prune; pub(crate) mod pruner; diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index 5c881459b2..86f6b07ffc 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -19,17 +19,13 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use api::v1::OpType; -use async_trait::async_trait; -use common_telemetry::debug; -use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::prelude::ScalarVector; use datatypes::value::Value; use datatypes::vectors::MutableVector; use crate::error::Result; -use crate::metrics::MERGE_FILTER_ROWS_TOTAL; -use crate::read::{Batch, BatchColumn, BatchReader}; +use crate::read::{Batch, BatchColumn}; /// Trait for reporting dedup metrics. pub trait DedupMetricsReport: Send + Sync { @@ -37,80 +33,6 @@ pub trait DedupMetricsReport: Send + Sync { fn report(&self, metrics: &mut DedupMetrics); } -/// A reader that dedup sorted batches from a source based on the -/// dedup strategy. -pub struct DedupReader { - source: R, - strategy: S, - metrics: DedupMetrics, - /// Optional metrics reporter. - metrics_reporter: Option>, -} - -impl DedupReader { - /// Creates a new dedup reader. - pub fn new( - source: R, - strategy: S, - metrics_reporter: Option>, - ) -> Self { - Self { - source, - strategy, - metrics: DedupMetrics::default(), - metrics_reporter, - } - } -} - -impl DedupReader { - /// Returns the next deduplicated batch. - async fn fetch_next_batch(&mut self) -> Result> { - while let Some(batch) = self.source.next_batch().await? { - if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? { - self.metrics.maybe_report(&self.metrics_reporter); - return Ok(Some(batch)); - } - } - - let result = self.strategy.finish(&mut self.metrics)?; - self.metrics.maybe_report(&self.metrics_reporter); - Ok(result) - } -} - -#[async_trait] -impl BatchReader for DedupReader { - async fn next_batch(&mut self) -> Result> { - self.fetch_next_batch().await - } -} - -impl Drop for DedupReader { - fn drop(&mut self) { - debug!("Dedup reader finished, metrics: {:?}", self.metrics); - - MERGE_FILTER_ROWS_TOTAL - .with_label_values(&["dedup"]) - .inc_by(self.metrics.num_unselected_rows as u64); - MERGE_FILTER_ROWS_TOTAL - .with_label_values(&["delete"]) - .inc_by(self.metrics.num_unselected_rows as u64); - - // Report any remaining metrics. - if let Some(reporter) = &self.metrics_reporter { - reporter.report(&mut self.metrics); - } - } -} - -#[cfg(test)] -impl DedupReader { - fn metrics(&self) -> &DedupMetrics { - &self.metrics - } -} - /// Strategy to remove duplicate rows from sorted batches. pub trait DedupStrategy: Send { /// Pushes a batch to the dedup strategy. @@ -124,114 +46,6 @@ pub trait DedupStrategy: Send { fn finish(&mut self, metrics: &mut DedupMetrics) -> Result>; } -/// State of the last row in a batch for dedup. -struct BatchLastRow { - primary_key: Vec, - /// The last timestamp of the batch. - timestamp: Timestamp, -} - -/// Dedup strategy that keeps the row with latest sequence of each key. -/// -/// This strategy is optimized specially based on the properties of the SST files, -/// memtables and the merge reader. It assumes that batches from files and memtables -/// don't contain duplicate rows and the merge reader never concatenates batches from -/// different source. -/// -/// We might implement a new strategy if we need to process files with duplicate rows. -pub struct LastRow { - /// Meta of the last row in the previous batch that has the same key - /// as the batch to push. - prev_batch: Option, - /// Filter deleted rows. - filter_deleted: bool, -} - -impl LastRow { - /// Creates a new strategy with the given `filter_deleted` flag. - pub fn new(filter_deleted: bool) -> Self { - Self { - prev_batch: None, - filter_deleted, - } - } -} - -impl DedupStrategy for LastRow { - fn push_batch( - &mut self, - mut batch: Batch, - metrics: &mut DedupMetrics, - ) -> Result> { - let start = Instant::now(); - - if batch.is_empty() { - return Ok(None); - } - debug_assert!(batch.first_timestamp().is_some()); - let prev_timestamp = match &self.prev_batch { - Some(prev_batch) => { - if prev_batch.primary_key != batch.primary_key() { - // The key has changed. This is the first batch of the - // new key. - None - } else { - Some(prev_batch.timestamp) - } - } - None => None, - }; - if batch.first_timestamp() == prev_timestamp { - metrics.num_unselected_rows += 1; - // This batch contains a duplicate row, skip it. - if batch.num_rows() == 1 { - // We don't need to update `prev_batch` because they have the same - // key and timestamp. - metrics.dedup_cost += start.elapsed(); - return Ok(None); - } - // Skips the first row. - batch = batch.slice(1, batch.num_rows() - 1); - } - - // Store current batch to `prev_batch` so we could compare the next batch - // with this batch. We store batch before filtering it as rows with `OpType::Delete` - // would be removed from the batch after filter, then we may store an incorrect `last row` - // of previous batch. - match &mut self.prev_batch { - Some(prev) => { - // Reuse the primary key buffer. - prev.primary_key.clone_from(&batch.primary_key); - prev.timestamp = batch.last_timestamp().unwrap(); - } - None => { - self.prev_batch = Some(BatchLastRow { - primary_key: batch.primary_key().to_vec(), - timestamp: batch.last_timestamp().unwrap(), - }) - } - } - - // Filters deleted rows. - if self.filter_deleted { - filter_deleted_from_batch(&mut batch, metrics)?; - } - - metrics.dedup_cost += start.elapsed(); - - // The batch can become empty if all rows are deleted. - if batch.is_empty() { - Ok(None) - } else { - Ok(Some(batch)) - } - } - - fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result> { - Ok(None) - } -} - /// Removes deleted rows from the batch and updates metrics. fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> { let num_rows = batch.num_rows(); @@ -672,137 +486,10 @@ impl>> Iterator for LastNonNullIter { mod tests { use std::sync::Arc; - use api::v1::OpType; use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array}; use super::*; use crate::read::BatchBuilder; - use crate::test_util::{VecBatchReader, check_reader_result, new_batch}; - - #[tokio::test] - async fn test_dedup_reader_no_duplications() { - let input = [ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - ), - new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]), - new_batch( - b"k2", - &[1, 2], - &[111, 112], - &[OpType::Put, OpType::Put], - &[31, 32], - ), - ]; - - // Test last row. - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastRow::new(true), None); - check_reader_result(&mut reader, &input).await; - assert_eq!(0, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - - // Test last non-null. - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); - check_reader_result(&mut reader, &input).await; - assert_eq!(0, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - } - - #[tokio::test] - async fn test_dedup_reader_duplications() { - let input = [ - new_batch( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[11, 12], - ), - // empty batch. - new_batch(b"k1", &[], &[], &[], &[]), - // Duplicate with the previous batch. - new_batch( - b"k1", - &[2, 3, 4], - &[10, 13, 13], - &[OpType::Put, OpType::Put, OpType::Delete], - &[2, 13, 14], - ), - new_batch( - b"k2", - &[1, 2], - &[20, 20], - &[OpType::Put, OpType::Delete], - &[101, 0], - ), - new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]), - new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]), - // This batch won't increase the deleted rows count as it - // is filtered out by the previous batch. - new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]), - ]; - // Filter deleted. - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastRow::new(true), None); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[11, 12], - ), - new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]), - new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]), - new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]), - ], - ) - .await; - assert_eq!(5, reader.metrics().num_unselected_rows); - assert_eq!(2, reader.metrics().num_deleted_rows); - - // Does not filter deleted. - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastRow::new(false), None); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[11, 12], - ), - new_batch( - b"k1", - &[3, 4], - &[13, 13], - &[OpType::Put, OpType::Delete], - &[13, 14], - ), - new_batch( - b"k2", - &[1, 2], - &[20, 20], - &[OpType::Put, OpType::Delete], - &[101, 0], - ), - new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]), - ], - ) - .await; - assert_eq!(3, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - } /// Returns a new [Batch] whose field has column id 1, 2. fn new_batch_multi_fields( @@ -839,235 +526,6 @@ mod tests { builder.build().unwrap() } - #[tokio::test] - async fn test_last_non_null_merge() { - let input = [ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (None, None)], - ), - // empty batch. - new_batch_multi_fields(b"k1", &[], &[], &[], &[]), - // Duplicate with the previous batch. - new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]), - new_batch_multi_fields( - b"k1", - &[2, 3, 4], - &[10, 13, 13], - &[OpType::Put, OpType::Put, OpType::Delete], - &[(Some(2), Some(22)), (Some(13), None), (None, Some(14))], - ), - new_batch_multi_fields( - b"k2", - &[1, 2], - &[20, 20], - &[OpType::Put, OpType::Delete], - &[(Some(101), Some(101)), (None, None)], - ), - new_batch_multi_fields( - b"k2", - &[2], - &[19], - &[OpType::Put], - &[(Some(102), Some(102))], - ), - new_batch_multi_fields( - b"k3", - &[2], - &[20], - &[OpType::Put], - &[(Some(202), Some(202))], - ), - // This batch won't increase the deleted rows count as it - // is filtered out by the previous batch. (All fields are null). - new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]), - ]; - - // Filter deleted. - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); - check_reader_result( - &mut reader, - &[ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (Some(12), Some(22))], - ), - new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]), - new_batch_multi_fields( - b"k2", - &[1], - &[20], - &[OpType::Put], - &[(Some(101), Some(101))], - ), - new_batch_multi_fields( - b"k3", - &[2], - &[20], - &[OpType::Put], - &[(Some(202), Some(202))], - ), - ], - ) - .await; - assert_eq!(6, reader.metrics().num_unselected_rows); - assert_eq!(2, reader.metrics().num_deleted_rows); - - // Does not filter deleted. - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(false), None); - check_reader_result( - &mut reader, - &[ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (Some(12), Some(22))], - ), - new_batch_multi_fields( - b"k1", - &[3, 4], - &[13, 13], - &[OpType::Put, OpType::Delete], - &[(Some(13), None), (None, Some(14))], - ), - new_batch_multi_fields( - b"k2", - &[1, 2], - &[20, 20], - &[OpType::Put, OpType::Delete], - &[(Some(101), Some(101)), (None, None)], - ), - new_batch_multi_fields( - b"k3", - &[2], - &[20], - &[OpType::Put], - &[(Some(202), Some(202))], - ), - ], - ) - .await; - assert_eq!(4, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - } - - #[tokio::test] - async fn test_last_non_null_skip_merge_single() { - let input = [new_batch_multi_fields( - b"k1", - &[1, 2, 3], - &[13, 11, 13], - &[OpType::Put, OpType::Delete, OpType::Put], - &[(Some(11), Some(11)), (None, None), (Some(13), Some(13))], - )]; - - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); - check_reader_result( - &mut reader, - &[new_batch_multi_fields( - b"k1", - &[1, 3], - &[13, 13], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (Some(13), Some(13))], - )], - ) - .await; - assert_eq!(1, reader.metrics().num_unselected_rows); - assert_eq!(1, reader.metrics().num_deleted_rows); - - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(false), None); - check_reader_result(&mut reader, &input).await; - assert_eq!(0, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - } - - #[tokio::test] - async fn test_last_non_null_skip_merge_no_null() { - let input = [ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (Some(12), Some(12))], - ), - new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]), - new_batch_multi_fields( - b"k1", - &[2, 3], - &[9, 13], - &[OpType::Put, OpType::Put], - &[(Some(32), None), (Some(13), Some(13))], - ), - ]; - - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); - check_reader_result( - &mut reader, - &[ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (Some(12), Some(12))], - ), - new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]), - ], - ) - .await; - assert_eq!(2, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - } - - #[tokio::test] - async fn test_last_non_null_merge_null() { - let input = [ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (None, None)], - ), - new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]), - new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]), - ]; - - let reader = VecBatchReader::new(&input); - let mut reader = DedupReader::new(reader, LastNonNull::new(true), None); - check_reader_result( - &mut reader, - &[ - new_batch_multi_fields( - b"k1", - &[1, 2], - &[13, 11], - &[OpType::Put, OpType::Put], - &[(Some(11), Some(11)), (None, Some(22))], - ), - new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]), - ], - ) - .await; - assert_eq!(1, reader.metrics().num_unselected_rows); - assert_eq!(0, reader.metrics().num_deleted_rows); - } - fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) { let mut actual = Vec::new(); let mut metrics = DedupMetrics::default(); diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 946f2a610c..b1c304f244 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -14,8 +14,9 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; +use std::fmt; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use async_stream::try_stream; use common_telemetry::debug; @@ -34,7 +35,6 @@ use crate::error::{ComputeArrowSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; use crate::metrics::READ_STAGE_ELAPSED; use crate::read::BoxedRecordBatchStream; -use crate::read::merge::{MergeMetrics, MergeMetricsReport}; use crate::sst::parquet::flat_format::{ primary_key_column_index, sequence_column_index, time_index_column_index, }; @@ -105,6 +105,84 @@ struct BatchCursor { row_idx: usize, } +/// Trait for reporting merge metrics. +pub trait MergeMetricsReport: Send + Sync { + /// Reports and resets the metrics. + fn report(&self, metrics: &mut MergeMetrics); +} + +/// Metrics for the merge reader. +#[derive(Default)] +pub struct MergeMetrics { + /// Cost to initialize the reader. + pub(crate) init_cost: Duration, + /// Total scan cost of the reader. + pub(crate) scan_cost: Duration, + /// Number of times to fetch batches. + pub(crate) num_fetch_by_batches: usize, + /// Number of times to fetch rows. + pub(crate) num_fetch_by_rows: usize, + /// Cost to fetch batches from sources. + pub(crate) fetch_cost: Duration, +} + +impl fmt::Debug for MergeMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.scan_cost.is_zero() { + return write!(f, "{{}}"); + } + + write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?; + + if !self.init_cost.is_zero() { + write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?; + } + if self.num_fetch_by_batches > 0 { + write!( + f, + r#", "num_fetch_by_batches":{}"#, + self.num_fetch_by_batches + )?; + } + if self.num_fetch_by_rows > 0 { + write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?; + } + if !self.fetch_cost.is_zero() { + write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?; + } + + write!(f, "}}") + } +} + +impl MergeMetrics { + /// Merges metrics from another MergeMetrics instance. + pub(crate) fn merge(&mut self, other: &MergeMetrics) { + let MergeMetrics { + init_cost, + scan_cost, + num_fetch_by_batches, + num_fetch_by_rows, + fetch_cost, + } = other; + + self.init_cost += *init_cost; + self.scan_cost += *scan_cost; + self.num_fetch_by_batches += *num_fetch_by_batches; + self.num_fetch_by_rows += *num_fetch_by_rows; + self.fetch_cost += *fetch_cost; + } + + /// Reports the metrics if scan_cost exceeds 10ms and resets them. + pub(crate) fn maybe_report(&mut self, reporter: &Option>) { + if self.scan_cost.as_millis() > 10 + && let Some(r) = reporter + { + r.report(self); + } + } +} + /// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`] // Ports from https://github.com/apache/datafusion/blob/49.0.0/datafusion/physical-plan/src/sorts/builder.rs // Adds the `take_remaining_rows()` method. diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs deleted file mode 100644 index 0470e4b01a..0000000000 --- a/src/mito2/src/read/merge.rs +++ /dev/null @@ -1,982 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Merge reader implementation. - -use std::cmp::Ordering; -use std::collections::BinaryHeap; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use std::{fmt, mem}; - -use async_trait::async_trait; -use common_telemetry::debug; - -use crate::error::Result; -use crate::memtable::BoxedBatchIterator; -use crate::metrics::READ_STAGE_ELAPSED; -use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; - -/// Trait for reporting merge metrics. -pub trait MergeMetricsReport: Send + Sync { - /// Reports and resets the metrics. - fn report(&self, metrics: &mut MergeMetrics); -} - -/// Reader to merge sorted batches. -/// -/// The merge reader merges [Batch]es from multiple sources that yield sorted batches. -/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can -/// ignore op type as sequence is already unique). -/// 2. Batches from sources **must** not be empty. -/// -/// The reader won't concatenate batches. Each batch returned by the reader also doesn't -/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same -/// as the first one in the next batch. -pub struct MergeReader { - /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window. - /// Each node yields batches from a `source`. - /// - /// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp) - /// range of the **root node** in the `hot` heap. - hot: BinaryHeap, - /// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window. - /// - /// `Node` in this heap **must** not be empty. - cold: BinaryHeap, - /// Batch to output. - output_batch: Option, - /// Local metrics. - metrics: MergeMetrics, - /// Optional metrics reporter. - metrics_reporter: Option>, -} - -#[async_trait] -impl BatchReader for MergeReader { - async fn next_batch(&mut self) -> Result> { - let start = Instant::now(); - while !self.hot.is_empty() && self.output_batch.is_none() { - if self.hot.len() == 1 { - // No need to do merge sort if only one batch in the hot heap. - self.fetch_batch_from_hottest().await?; - self.metrics.num_fetch_by_batches += 1; - } else { - // We could only fetch rows that less than the next node from the hottest node. - self.fetch_rows_from_hottest().await?; - self.metrics.num_fetch_by_rows += 1; - } - } - - if let Some(batch) = self.output_batch.take() { - self.metrics.scan_cost += start.elapsed(); - self.metrics.maybe_report(&self.metrics_reporter); - Ok(Some(batch)) - } else { - // Nothing fetched. - self.metrics.scan_cost += start.elapsed(); - self.metrics.maybe_report(&self.metrics_reporter); - Ok(None) - } - } -} - -impl Drop for MergeReader { - fn drop(&mut self) { - debug!("Merge reader finished, metrics: {:?}", self.metrics); - - READ_STAGE_ELAPSED - .with_label_values(&["merge"]) - .observe(self.metrics.scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["merge_fetch"]) - .observe(self.metrics.fetch_cost.as_secs_f64()); - - // Report any remaining metrics. - if let Some(reporter) = &self.metrics_reporter { - reporter.report(&mut self.metrics); - } - } -} - -impl MergeReader { - /// Creates and initializes a new [MergeReader]. - pub async fn new( - sources: Vec, - metrics_reporter: Option>, - ) -> Result { - let start = Instant::now(); - let mut metrics = MergeMetrics::default(); - - let mut cold = BinaryHeap::with_capacity(sources.len()); - let hot = BinaryHeap::with_capacity(sources.len()); - for source in sources { - let node = Node::new(source, &mut metrics).await?; - if !node.is_eof() { - // Ensure `cold` don't have eof nodes. - cold.push(node); - } - } - - let mut reader = MergeReader { - hot, - cold, - output_batch: None, - metrics, - metrics_reporter, - }; - // Initializes the reader. - reader.refill_hot(); - - let elapsed = start.elapsed(); - reader.metrics.init_cost += elapsed; - reader.metrics.scan_cost += elapsed; - Ok(reader) - } - - /// Moves nodes in `cold` heap, whose key range is overlapped with current merge - /// window to `hot` heap. - fn refill_hot(&mut self) { - while !self.cold.is_empty() { - if let Some(merge_window) = self.hot.peek() { - let warmest = self.cold.peek().unwrap(); - if warmest.is_behind(merge_window) { - // if the warmest node in the `cold` heap is totally after the - // `merge_window`, then no need to add more nodes into the `hot` - // heap for merge sorting. - break; - } - } - - let warmest = self.cold.pop().unwrap(); - self.hot.push(warmest); - } - } - - /// Fetches one batch from the hottest node. - async fn fetch_batch_from_hottest(&mut self) -> Result<()> { - assert_eq!(1, self.hot.len()); - - let mut hottest = self.hot.pop().unwrap(); - let batch = hottest.fetch_batch(&mut self.metrics).await?; - Self::maybe_output_batch(batch, &mut self.output_batch)?; - self.reheap(hottest) - } - - /// Fetches non-duplicated rows from the hottest node. - async fn fetch_rows_from_hottest(&mut self) -> Result<()> { - // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element. - // Pop hottest node. - let mut top_node = self.hot.pop().unwrap(); - let top = top_node.current_batch(); - // Min timestamp and its sequence in the next batch. - let next_min_ts = { - let next_node = self.hot.peek().unwrap(); - let next = next_node.current_batch(); - // top and next have overlapping rows so they must have same primary keys. - debug_assert_eq!(top.primary_key(), next.primary_key()); - // Safety: Batches in the heap is not empty, so we can use unwrap here. - next.first_timestamp().unwrap() - }; - - // Safety: Batches in the heap is not empty, so we can use unwrap here. - let timestamps = top.timestamps_native().unwrap(); - // Binary searches the timestamp in the top batch. - // Safety: Batches should have the same timestamp resolution so we can compare the native - // value directly. - let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) { - Ok(pos) => pos, - Err(pos) => { - // No duplicate timestamp. Outputs timestamp before `pos`. - Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?; - top_node.skip_rows(pos, &mut self.metrics).await?; - return self.reheap(top_node); - } - }; - - // No need to remove duplicate timestamps. - let output_end = if duplicate_pos == 0 { - // If the first timestamp of the top node is duplicate. We can simply return the first row - // as the heap ensure it is the one with largest sequence. - 1 - } else { - // We don't know which one has the larger sequence so we use the range before - // the duplicate pos. - duplicate_pos - }; - Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?; - top_node.skip_rows(output_end, &mut self.metrics).await?; - self.reheap(top_node) - } - - /// Push the node popped from `hot` back to a proper heap. - fn reheap(&mut self, node: Node) -> Result<()> { - if node.is_eof() { - // If the node is EOF, don't put it into the heap again. - // The merge window would be updated, need to refill the hot heap. - self.refill_hot(); - } else { - // Find a proper heap for this node. - let node_is_cold = if let Some(hottest) = self.hot.peek() { - // If key range of this node is behind the hottest node's then we can - // push it to the cold heap. Otherwise we should push it to the hot heap. - node.is_behind(hottest) - } else { - // The hot heap is empty, but we don't known whether the current - // batch of this node is still the hottest. - true - }; - - if node_is_cold { - self.cold.push(node); - } else { - self.hot.push(node); - } - // Anyway, the merge window has been changed, we need to refill the hot heap. - self.refill_hot(); - } - - Ok(()) - } - - /// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`. - /// - /// Ignores the `batch` if it is empty. - fn maybe_output_batch(batch: Batch, output_batch: &mut Option) -> Result<()> { - debug_assert!(output_batch.is_none()); - if batch.is_empty() { - return Ok(()); - } - *output_batch = Some(batch); - - Ok(()) - } -} - -/// Builder to build and initialize a [MergeReader]. -#[derive(Default)] -pub struct MergeReaderBuilder { - /// Input sources. - /// - /// All source must yield batches with the same schema. - sources: Vec, - /// Optional metrics reporter. - metrics_reporter: Option>, -} - -impl MergeReaderBuilder { - /// Returns an empty builder. - pub fn new() -> MergeReaderBuilder { - MergeReaderBuilder::default() - } - - /// Creates a builder from sources. - pub fn from_sources(sources: Vec) -> MergeReaderBuilder { - MergeReaderBuilder { - sources, - metrics_reporter: None, - } - } - - /// Pushes a batch reader to sources. - pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self { - self.sources.push(Source::Reader(reader)); - self - } - - /// Pushes a batch iterator to sources. - pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self { - self.sources.push(Source::Iter(iter)); - self - } - - /// Sets the metrics reporter. - pub fn with_metrics_reporter( - &mut self, - reporter: Option>, - ) -> &mut Self { - self.metrics_reporter = reporter; - self - } - - /// Builds and initializes the reader, then resets the builder. - pub async fn build(&mut self) -> Result { - let sources = mem::take(&mut self.sources); - let metrics_reporter = self.metrics_reporter.take(); - MergeReader::new(sources, metrics_reporter).await - } -} - -/// Metrics for the merge reader. -#[derive(Default)] -pub struct MergeMetrics { - /// Cost to initialize the reader. - pub(crate) init_cost: Duration, - /// Total scan cost of the reader. - pub(crate) scan_cost: Duration, - /// Number of times to fetch batches. - pub(crate) num_fetch_by_batches: usize, - /// Number of times to fetch rows. - pub(crate) num_fetch_by_rows: usize, - /// Cost to fetch batches from sources. - pub(crate) fetch_cost: Duration, -} - -impl fmt::Debug for MergeMetrics { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Skip output if scan_cost is zero - if self.scan_cost.is_zero() { - return write!(f, "{{}}"); - } - - write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?; - - if !self.init_cost.is_zero() { - write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?; - } - if self.num_fetch_by_batches > 0 { - write!( - f, - r#", "num_fetch_by_batches":{}"#, - self.num_fetch_by_batches - )?; - } - if self.num_fetch_by_rows > 0 { - write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?; - } - if !self.fetch_cost.is_zero() { - write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?; - } - - write!(f, "}}") - } -} - -impl MergeMetrics { - /// Merges metrics from another MergeMetrics instance. - pub(crate) fn merge(&mut self, other: &MergeMetrics) { - let MergeMetrics { - init_cost, - scan_cost, - num_fetch_by_batches, - num_fetch_by_rows, - fetch_cost, - } = other; - - self.init_cost += *init_cost; - self.scan_cost += *scan_cost; - self.num_fetch_by_batches += *num_fetch_by_batches; - self.num_fetch_by_rows += *num_fetch_by_rows; - self.fetch_cost += *fetch_cost; - } - - /// Reports the metrics if scan_cost exceeds 10ms and resets them. - pub(crate) fn maybe_report(&mut self, reporter: &Option>) { - if self.scan_cost.as_millis() > 10 - && let Some(r) = reporter - { - r.report(self); - } - } -} - -/// A `Node` represent an individual input data source to be merged. -struct Node { - /// Data source of this `Node`. - source: Source, - /// Current batch to be read. The node ensures the batch is not empty. - /// - /// `None` means the `source` has reached EOF. - current_batch: Option, -} - -impl Node { - /// Initialize a node. - /// - /// It tries to fetch one batch from the `source`. - async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result { - // Ensures batch is not empty. - let start = Instant::now(); - let current_batch = source.next_batch().await?.map(CompareFirst); - metrics.fetch_cost += start.elapsed(); - - Ok(Node { - source, - current_batch, - }) - } - - /// Returns whether the node still has batch to read. - fn is_eof(&self) -> bool { - self.current_batch.is_none() - } - - /// Returns the primary key of current batch. - /// - /// # Panics - /// Panics if the node has reached EOF. - fn primary_key(&self) -> &[u8] { - self.current_batch().primary_key() - } - - /// Returns current batch. - /// - /// # Panics - /// Panics if the node has reached EOF. - fn current_batch(&self) -> &Batch { - &self.current_batch.as_ref().unwrap().0 - } - - /// Returns current batch and fetches next batch - /// from the source. - /// - /// # Panics - /// Panics if the node has reached EOF. - async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result { - let current = self.current_batch.take().unwrap(); - let start = Instant::now(); - // Ensures batch is not empty. - self.current_batch = self.source.next_batch().await?.map(CompareFirst); - metrics.fetch_cost += start.elapsed(); - Ok(current.0) - } - - /// Returns true if the key range of current batch in `self` is behind (exclusive) current - /// batch in `other`. - /// - /// # Panics - /// Panics if either `self` or `other` is EOF. - fn is_behind(&self, other: &Node) -> bool { - debug_assert!(!self.current_batch().is_empty()); - debug_assert!(!other.current_batch().is_empty()); - - // We only compare pk and timestamp so nodes in the cold - // heap don't have overlapping timestamps with the hottest node - // in the hot heap. - self.primary_key().cmp(other.primary_key()).then_with(|| { - self.current_batch() - .first_timestamp() - .cmp(&other.current_batch().last_timestamp()) - }) == Ordering::Greater - } - - /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches - /// next batch from the node. - /// - /// # Panics - /// Panics if the node is EOF. - async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> { - let batch = self.current_batch(); - debug_assert!(batch.num_rows() >= num_to_skip); - - let remaining = batch.num_rows() - num_to_skip; - if remaining == 0 { - // Nothing remains, we need to fetch next batch to ensure the batch is not empty. - self.fetch_batch(metrics).await?; - } else { - debug_assert!(!batch.is_empty()); - self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining))); - } - - Ok(()) - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Node) -> bool { - self.current_batch == other.current_batch - } -} - -impl Eq for Node {} - -impl PartialOrd for Node { - fn partial_cmp(&self, other: &Node) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for Node { - fn cmp(&self, other: &Node) -> Ordering { - // The std binary heap is a max heap, but we want the nodes are ordered in - // ascend order, so we compare the nodes in reverse order. - other.current_batch.cmp(&self.current_batch) - } -} - -/// Type to compare [Batch] by first row. -/// -/// It ignores op type as sequence is enough to distinguish different rows. -struct CompareFirst(Batch); - -impl PartialEq for CompareFirst { - fn eq(&self, other: &Self) -> bool { - self.0.primary_key() == other.0.primary_key() - && self.0.first_timestamp() == other.0.first_timestamp() - && self.0.first_sequence() == other.0.first_sequence() - } -} - -impl Eq for CompareFirst {} - -impl PartialOrd for CompareFirst { - fn partial_cmp(&self, other: &CompareFirst) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for CompareFirst { - /// Compares by primary key, time index, sequence desc. - fn cmp(&self, other: &CompareFirst) -> Ordering { - self.0 - .primary_key() - .cmp(other.0.primary_key()) - .then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp())) - .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence())) - } -} - -#[cfg(test)] -mod tests { - use api::v1::OpType; - - use super::*; - use crate::test_util::{VecBatchReader, check_reader_result, new_batch}; - - #[tokio::test] - async fn test_merge_reader_empty() { - let mut reader = MergeReaderBuilder::new().build().await.unwrap(); - assert!(reader.next_batch().await.unwrap().is_none()); - assert!(reader.next_batch().await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_merge_non_overlapping() { - let reader1 = VecBatchReader::new(&[ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - ), - new_batch( - b"k1", - &[7, 8], - &[17, 18], - &[OpType::Put, OpType::Delete], - &[27, 28], - ), - new_batch( - b"k2", - &[2, 3], - &[12, 13], - &[OpType::Delete, OpType::Put], - &[22, 23], - ), - ]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[4, 5], - &[14, 15], - &[OpType::Put, OpType::Put], - &[24, 25], - )]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - ), - new_batch( - b"k1", - &[4, 5], - &[14, 15], - &[OpType::Put, OpType::Put], - &[24, 25], - ), - new_batch( - b"k1", - &[7, 8], - &[17, 18], - &[OpType::Put, OpType::Delete], - &[27, 28], - ), - new_batch( - b"k2", - &[2, 3], - &[12, 13], - &[OpType::Delete, OpType::Put], - &[22, 23], - ), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_reheap_hot() { - let reader1 = VecBatchReader::new(&[ - new_batch( - b"k1", - &[1, 3], - &[10, 10], - &[OpType::Put, OpType::Put], - &[21, 23], - ), - new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]), - ]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[2, 4], - &[11, 11], - &[OpType::Put, OpType::Put], - &[32, 34], - )]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), - new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]), - new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]), - new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]), - new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_overlapping() { - let reader1 = VecBatchReader::new(&[ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - ), - new_batch( - b"k1", - &[4, 5], - &[14, 15], - // This override 4 and deletes 5. - &[OpType::Put, OpType::Delete], - &[24, 25], - ), - new_batch( - b"k2", - &[2, 3], - &[12, 13], - // This delete 2. - &[OpType::Delete, OpType::Put], - &[22, 23], - ), - ]); - let reader2 = VecBatchReader::new(&[ - new_batch( - b"k1", - &[3, 4, 5], - &[10, 10, 10], - &[OpType::Put, OpType::Put, OpType::Put], - &[33, 34, 35], - ), - new_batch( - b"k2", - &[1, 10], - &[11, 20], - &[OpType::Put, OpType::Put], - &[21, 30], - ), - ]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - ), - new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]), - new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]), - new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]), - new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]), - new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]), - new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]), - new_batch( - b"k2", - &[2, 3], - &[12, 13], - &[OpType::Delete, OpType::Put], - &[22, 23], - ), - new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_deleted() { - let reader1 = VecBatchReader::new(&[ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Delete, OpType::Delete], - &[21, 22], - ), - new_batch( - b"k2", - &[2, 3], - &[12, 13], - &[OpType::Delete, OpType::Put], - &[22, 23], - ), - ]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[4, 5], - &[14, 15], - &[OpType::Delete, OpType::Delete], - &[24, 25], - )]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Delete, OpType::Delete], - &[21, 22], - ), - new_batch( - b"k1", - &[4, 5], - &[14, 15], - &[OpType::Delete, OpType::Delete], - &[24, 25], - ), - new_batch( - b"k2", - &[2, 3], - &[12, 13], - &[OpType::Delete, OpType::Put], - &[22, 23], - ), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_next_node_empty() { - let reader1 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 2], - &[11, 12], - &[OpType::Put, OpType::Put], - &[21, 22], - )]); - let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]), - new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]), - new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_top_node_empty() { - let reader1 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 2], - &[10, 10], - &[OpType::Put, OpType::Put], - &[21, 22], - )]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[2, 3], - &[11, 11], - &[OpType::Put, OpType::Put], - &[32, 33], - )]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), - new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]), - new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]), - new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_large_range() { - let reader1 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 10], - &[10, 10], - &[OpType::Put, OpType::Put], - &[21, 30], - )]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 20], - &[11, 11], - &[OpType::Put, OpType::Put], - &[31, 40], - )]); - // The hot heap have a node that doesn't have duplicate - // timestamps. - let reader3 = VecBatchReader::new(&[new_batch( - b"k1", - &[6, 8], - &[11, 11], - &[OpType::Put, OpType::Put], - &[36, 38], - )]); - let mut reader = MergeReaderBuilder::new() - .push_batch_reader(Box::new(reader1)) - .push_batch_iter(Box::new(reader2)) - .push_batch_reader(Box::new(reader3)) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]), - new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), - new_batch( - b"k1", - &[6, 8], - &[11, 11], - &[OpType::Put, OpType::Put], - &[36, 38], - ), - new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]), - new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]), - ], - ) - .await; - } - - #[tokio::test] - async fn test_merge_many_duplicates() { - let mut builder = MergeReaderBuilder::new(); - for i in 0..10 { - let batches: Vec<_> = (0..8) - .map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100])) - .collect(); - let reader = VecBatchReader::new(&batches); - builder.push_batch_reader(Box::new(reader)); - } - let mut reader = builder.build().await.unwrap(); - let mut expect = Vec::with_capacity(80); - for ts in 0..8 { - for i in 0..10 { - let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]); - expect.push(batch); - } - } - check_reader_result(&mut reader, &expect).await; - } - - #[tokio::test] - async fn test_merge_keep_duplicate() { - let reader1 = VecBatchReader::new(&[new_batch( - b"k1", - &[1, 2], - &[10, 10], - &[OpType::Put, OpType::Put], - &[21, 22], - )]); - let reader2 = VecBatchReader::new(&[new_batch( - b"k1", - &[2, 3], - &[11, 11], - &[OpType::Put, OpType::Put], - &[32, 33], - )]); - let sources = vec![ - Source::Reader(Box::new(reader1)), - Source::Iter(Box::new(reader2)), - ]; - let mut reader = MergeReaderBuilder::from_sources(sources) - .build() - .await - .unwrap(); - check_reader_result( - &mut reader, - &[ - new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]), - new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]), - new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]), - new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]), - ], - ) - .await; - } -} diff --git a/src/mito2/src/read/plain_batch.rs b/src/mito2/src/read/plain_batch.rs deleted file mode 100644 index f22b6688d6..0000000000 --- a/src/mito2/src/read/plain_batch.rs +++ /dev/null @@ -1,505 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Batch without an encoded primary key. - -use std::collections::HashMap; -use std::sync::Arc; - -use api::v1::OpType; -use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt8Array, UInt64Array}; -use datatypes::arrow::compute::filter_record_batch; -use datatypes::arrow::datatypes::SchemaRef; -use datatypes::arrow::record_batch::RecordBatch; -use snafu::{OptionExt, ResultExt}; -use store_api::metadata::{ColumnMetadata, RegionMetadata}; -use store_api::storage::{RegionId, SequenceNumber}; - -use crate::error::{ - ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result, - UnexpectedSnafu, -}; - -/// Number of columns that have fixed positions. -/// -/// Contains all internal columns. -pub(crate) const PLAIN_FIXED_POS_COLUMN_NUM: usize = 2; - -/// [PlainBatch] represents a batch of rows. -/// It is a wrapper around [RecordBatch]. -/// -/// The columns order is the same as the order of the columns read from the SST. -/// It always contains two internal columns now. We may change modify this behavior -/// in the future. -#[derive(Debug)] -pub struct PlainBatch { - /// The original record batch. - record_batch: RecordBatch, -} - -impl PlainBatch { - /// Creates a new [PlainBatch] from a [RecordBatch]. - pub fn new(record_batch: RecordBatch) -> Self { - assert!( - record_batch.num_columns() >= 2, - "record batch missing internal columns, num_columns: {}", - record_batch.num_columns() - ); - - Self { record_batch } - } - - /// Returns a new [PlainBatch] with the given columns. - pub fn with_new_columns(&self, columns: Vec) -> Result { - let record_batch = RecordBatch::try_new(self.record_batch.schema(), columns) - .context(NewRecordBatchSnafu)?; - Ok(Self::new(record_batch)) - } - - /// Returns the number of columns in the batch. - pub fn num_columns(&self) -> usize { - self.record_batch.num_columns() - } - - /// Returns the number of rows in the batch. - pub fn num_rows(&self) -> usize { - self.record_batch.num_rows() - } - - /// Returns true if the batch is empty. - pub fn is_empty(&self) -> bool { - self.num_rows() == 0 - } - - /// Returns all columns. - pub fn columns(&self) -> &[ArrayRef] { - self.record_batch.columns() - } - - /// Returns the array of column at index `idx`. - pub fn column(&self, idx: usize) -> &ArrayRef { - self.record_batch.column(idx) - } - - /// Returns the slice of internal columns. - pub fn internal_columns(&self) -> &[ArrayRef] { - &self.record_batch.columns()[self.record_batch.num_columns() - PLAIN_FIXED_POS_COLUMN_NUM..] - } - - /// Returns the inner record batch. - pub fn as_record_batch(&self) -> &RecordBatch { - &self.record_batch - } - - /// Converts this batch into a record batch. - pub fn into_record_batch(self) -> RecordBatch { - self.record_batch - } - - /// Filters this batch by the boolean array. - pub fn filter(&self, predicate: &BooleanArray) -> Result { - let record_batch = - filter_record_batch(&self.record_batch, predicate).context(ComputeArrowSnafu)?; - Ok(Self::new(record_batch)) - } - - /// Returns the column index of the sequence column. - #[allow(dead_code)] - pub(crate) fn sequence_column_index(&self) -> usize { - self.record_batch.num_columns() - PLAIN_FIXED_POS_COLUMN_NUM - } -} - -/// Helper struct to fill default values and internal columns. -pub struct ColumnFiller<'a> { - /// Region metadata information - metadata: &'a RegionMetadata, - /// Schema for the output record batch - schema: SchemaRef, - /// Map of column names to indices in the input record batch - name_to_index: HashMap, -} - -impl<'a> ColumnFiller<'a> { - /// Creates a new ColumnFiller - /// The `schema` is the sst schema of the `metadata`. - pub fn new( - metadata: &'a RegionMetadata, - schema: SchemaRef, - record_batch: &RecordBatch, - ) -> Self { - debug_assert_eq!(metadata.column_metadatas.len() + 2, schema.fields().len()); - - // Pre-construct the name to index map - let name_to_index: HashMap<_, _> = record_batch - .schema() - .fields() - .iter() - .enumerate() - .map(|(i, field)| (field.name().clone(), i)) - .collect(); - - Self { - metadata, - schema, - name_to_index, - } - } - - /// Fills default values and internal columns for a [RecordBatch]. - pub fn fill_missing_columns( - &self, - record_batch: &RecordBatch, - sequence: SequenceNumber, - op_type: OpType, - ) -> Result { - let num_rows = record_batch.num_rows(); - let mut new_columns = - Vec::with_capacity(record_batch.num_columns() + PLAIN_FIXED_POS_COLUMN_NUM); - - // Fills default values. - // Implementation based on `WriteRequest::fill_missing_columns()`. - for column in &self.metadata.column_metadatas { - let array = match self.name_to_index.get(&column.column_schema.name) { - Some(index) => record_batch.column(*index).clone(), - None => match op_type { - OpType::Put => { - // For put requests, we use the default value from column schema. - fill_column_put_default(self.metadata.region_id, column, num_rows)? - } - OpType::Delete => { - // For delete requests, we need default value for padding. - fill_column_delete_default(column, num_rows)? - } - }, - }; - - new_columns.push(array); - } - - // Adds internal columns. - // Adds the sequence number. - let sequence_array = Arc::new(UInt64Array::from(vec![sequence; num_rows])); - // Adds the op type. - let op_type_array = Arc::new(UInt8Array::from(vec![op_type as u8; num_rows])); - new_columns.push(sequence_array); - new_columns.push(op_type_array); - - RecordBatch::try_new(self.schema.clone(), new_columns).context(NewRecordBatchSnafu) - } -} - -fn fill_column_put_default( - region_id: RegionId, - column: &ColumnMetadata, - num_rows: usize, -) -> Result { - if column.column_schema.is_default_impure() { - return UnexpectedSnafu { - reason: format!( - "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", - region_id, - column.column_schema.name, - column.column_schema.default_constraint(), - ), - } - .fail(); - } - let vector = column - .column_schema - .create_default_vector(num_rows) - .context(CreateDefaultSnafu { - region_id, - column: &column.column_schema.name, - })? - // This column doesn't have default value. - .with_context(|| InvalidRequestSnafu { - region_id, - reason: format!( - "column {} does not have default value", - column.column_schema.name - ), - })?; - Ok(vector.to_arrow_array()) -} - -fn fill_column_delete_default(column: &ColumnMetadata, num_rows: usize) -> Result { - // For delete requests, we need a default value for padding - let vector = column - .column_schema - .create_default_vector_for_padding(num_rows); - Ok(vector.to_arrow_array()) -} - -#[cfg(test)] -mod tests { - use api::v1::SemanticType; - use datatypes::arrow::array::{ - Float64Array, Int32Array, StringArray, TimestampMillisecondArray, - }; - use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; - use datatypes::schema::ColumnSchema; - use datatypes::schema::constraint::ColumnDefaultConstraint; - use datatypes::value::Value; - use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; - use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; - use store_api::storage::{ConcreteDataType, RegionId}; - - use super::*; - use crate::sst::to_plain_sst_arrow_schema; - - /// Creates a test region metadata with schema: k0(string), ts(timestamp), v1(float64) - fn create_test_region_metadata() -> RegionMetadata { - let mut builder = RegionMetadataBuilder::new(RegionId::new(100, 200)); - builder - // Add string key column - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false) - .with_default_constraint(None) - .unwrap(), - semantic_type: SemanticType::Tag, - column_id: 0, - }) - // Add timestamp column - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ) - .with_time_index(true) - .with_default_constraint(None) - .unwrap(), - semantic_type: SemanticType::Timestamp, - column_id: 1, - }) - // Add float value column with default - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Float64( - datatypes::value::OrderedFloat::from(42.0), - )))) - .unwrap(), - semantic_type: SemanticType::Field, - column_id: 2, - }) - .primary_key(vec![0]); - - builder.build().unwrap() - } - - #[test] - fn test_column_filler_put() { - let region_metadata = create_test_region_metadata(); - let output_schema = to_plain_sst_arrow_schema(®ion_metadata); - - // Create input record batch with only k0 and ts columns (v1 is missing) - let input_schema = Arc::new(Schema::new(vec![ - Field::new("k0", DataType::Utf8, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - ])); - - let k0_values: ArrayRef = Arc::new(StringArray::from(vec!["key1", "key2"])); - let ts_values: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])); - - let input_batch = - RecordBatch::try_new(input_schema, vec![k0_values.clone(), ts_values.clone()]).unwrap(); - - // Create column filler - let filler = ColumnFiller::new(®ion_metadata, output_schema.clone(), &input_batch); - - // Fill missing columns with OpType::Put - let result = filler - .fill_missing_columns(&input_batch, 100, OpType::Put) - .unwrap(); - - // Verify the result - // Create an expected record batch to compare against - let expected_columns = vec![ - k0_values.clone(), - ts_values.clone(), - Arc::new(Float64Array::from(vec![42.0, 42.0])), - Arc::new(UInt64Array::from(vec![100, 100])), - Arc::new(UInt8Array::from(vec![OpType::Put as u8, OpType::Put as u8])), - ]; - let expected_batch = RecordBatch::try_new(output_schema.clone(), expected_columns).unwrap(); - assert_eq!(expected_batch, result); - } - - #[test] - fn test_column_filler_delete() { - let region_metadata = create_test_region_metadata(); - let output_schema = to_plain_sst_arrow_schema(®ion_metadata); - - // Create input record batch with only k0 and ts columns (v1 is missing) - let input_schema = Arc::new(Schema::new(vec![ - Field::new("k0", DataType::Utf8, false), - Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - ), - ])); - - let k0_values: ArrayRef = Arc::new(StringArray::from(vec!["key1", "key2"])); - let ts_values: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000])); - - let input_batch = - RecordBatch::try_new(input_schema, vec![k0_values.clone(), ts_values.clone()]).unwrap(); - - // Create column filler - let filler = ColumnFiller::new(®ion_metadata, output_schema.clone(), &input_batch); - - // Fill missing columns with OpType::Delete - let result = filler - .fill_missing_columns(&input_batch, 200, OpType::Delete) - .unwrap(); - - // Verify the result by creating an expected record batch to compare against - let v1_default = Arc::new(Float64Array::from(vec![None, None])); - let expected_columns = vec![ - k0_values.clone(), - ts_values.clone(), - v1_default, - Arc::new(UInt64Array::from(vec![200, 200])), - Arc::new(UInt8Array::from(vec![ - OpType::Delete as u8, - OpType::Delete as u8, - ])), - ]; - let expected_batch = RecordBatch::try_new(output_schema.clone(), expected_columns).unwrap(); - assert_eq!(expected_batch, result); - } - - fn create_test_record_batch() -> RecordBatch { - let schema = Arc::new(Schema::new(vec![ - Field::new("col1", DataType::Int32, false), - Field::new("col2", DataType::Utf8, false), - Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false), - Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false), - ])); - - let col1 = Arc::new(Int32Array::from(vec![1, 2, 3])); - let col2 = Arc::new(StringArray::from(vec!["a", "b", "c"])); - let sequence = Arc::new(UInt64Array::from(vec![100, 101, 102])); - let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); - - RecordBatch::try_new(schema, vec![col1, col2, sequence, op_type]).unwrap() - } - - #[test] - fn test_plain_batch_basic_methods() { - let record_batch = create_test_record_batch(); - let plain_batch = PlainBatch::new(record_batch.clone()); - - // Test basic properties - assert_eq!(plain_batch.num_columns(), 4); - assert_eq!(plain_batch.num_rows(), 3); - assert!(!plain_batch.is_empty()); - assert_eq!(plain_batch.columns().len(), 4); - - // Test internal columns access - let internal_columns = plain_batch.internal_columns(); - assert_eq!(internal_columns.len(), PLAIN_FIXED_POS_COLUMN_NUM); - assert_eq!(internal_columns[0].len(), 3); - assert_eq!(internal_columns[1].len(), 3); - - // Test column access - let col1 = plain_batch.column(0); - assert_eq!(col1.len(), 3); - assert_eq!( - col1.as_any().downcast_ref::().unwrap().value(0), - 1 - ); - - // Test sequence column index - assert_eq!(plain_batch.sequence_column_index(), 2); - - // Test to record batch. - assert_eq!(record_batch, *plain_batch.as_record_batch()); - assert_eq!(record_batch, plain_batch.into_record_batch()); - } - - #[test] - fn test_with_new_columns() { - let record_batch = create_test_record_batch(); - let plain_batch = PlainBatch::new(record_batch); - - // Create new columns - let col1 = Arc::new(Int32Array::from(vec![10, 20, 30])); - let col2 = Arc::new(StringArray::from(vec!["x", "y", "z"])); - let sequence = Arc::new(UInt64Array::from(vec![200, 201, 202])); - let op_type = Arc::new(UInt8Array::from(vec![0, 0, 0])); - - let new_batch = plain_batch - .with_new_columns(vec![col1, col2, sequence, op_type]) - .unwrap(); - - assert_eq!(new_batch.num_columns(), 4); - assert_eq!(new_batch.num_rows(), 3); - assert_eq!( - new_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - 10 - ); - assert_eq!( - new_batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - "x" - ); - } - - #[test] - fn test_filter() { - let record_batch = create_test_record_batch(); - let plain_batch = PlainBatch::new(record_batch); - - // Create a predicate that selects the first and third rows - let predicate = BooleanArray::from(vec![true, false, true]); - - let filtered_batch = plain_batch.filter(&predicate).unwrap(); - - assert_eq!(filtered_batch.num_rows(), 2); - assert_eq!( - filtered_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - 1 - ); - assert_eq!( - filtered_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(1), - 3 - ); - } -} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index c447685822..f645e3dc26 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -60,7 +60,7 @@ use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; use crate::read::stream::ScanBatchStream; use crate::read::unordered_scan::UnorderedScan; -use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source}; +use crate::read::{BoxedRecordBatchStream, RecordBatch}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; @@ -1031,39 +1031,6 @@ impl ScanInput { self } - /// Scans sources in parallel. - /// - /// # Panics if the input doesn't allow parallel scan. - #[tracing::instrument( - skip(self, sources, semaphore), - fields( - region_id = %self.region_metadata().region_id, - source_count = sources.len() - ) - )] - pub(crate) fn create_parallel_sources( - &self, - sources: Vec, - semaphore: Arc, - channel_size: usize, - ) -> Result> { - if sources.len() <= 1 { - return Ok(sources); - } - - // Spawn a task for each source. - let sources = sources - .into_iter() - .map(|source| { - let (sender, receiver) = mpsc::channel(channel_size); - self.spawn_scan_task(source, semaphore.clone(), sender); - let stream = Box::pin(ReceiverStream::new(receiver)); - Source::Stream(stream) - }) - .collect(); - Ok(sources) - } - /// Builds memtable ranges to scan by `index`. pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> { let memtable = &self.memtables[index.index]; @@ -1173,49 +1140,6 @@ impl ScanInput { Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection)) } - /// Scans the input source in another task and sends batches to the sender. - #[tracing::instrument( - skip(self, input, semaphore, sender), - fields(region_id = %self.region_metadata().region_id) - )] - pub(crate) fn spawn_scan_task( - &self, - mut input: Source, - semaphore: Arc, - sender: mpsc::Sender>, - ) { - let region_id = self.region_metadata().region_id; - let span = tracing::info_span!( - "ScanInput::parallel_scan_task", - region_id = %region_id, - stream_kind = "batch" - ); - common_runtime::spawn_global( - async move { - loop { - // We release the permit before sending result to avoid the task waiting on - // the channel with the permit held. - let maybe_batch = { - // Safety: We never close the semaphore. - let _permit = semaphore.acquire().await.unwrap(); - input.next_batch().await - }; - match maybe_batch { - Ok(Some(batch)) => { - let _ = sender.send(Ok(batch)).await; - } - Ok(None) => break, - Err(e) => { - let _ = sender.send(Err(e)).await; - break; - } - } - } - } - .instrument(span), - ); - } - /// Scans flat sources (RecordBatch streams) in parallel. /// /// # Panics if the input doesn't allow parallel scan. diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 597f592de6..8fc946b3d3 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -39,7 +39,7 @@ use crate::metrics::{ READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED, }; use crate::read::dedup::{DedupMetrics, DedupMetricsReport}; -use crate::read::merge::{MergeMetrics, MergeMetricsReport}; +use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport}; use crate::read::pruner::PartitionPruner; use crate::read::range::{RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 94bc1feea8..c769f78c6c 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -218,34 +218,6 @@ pub(crate) fn internal_fields() -> [FieldRef; 3] { ] } -/// Gets the arrow schema to store in parquet. -pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { - let fields = Fields::from_iter( - metadata - .schema - .arrow_schema() - .fields() - .iter() - .cloned() - .chain(plain_internal_fields()), - ); - - Arc::new(Schema::new(fields)) -} - -/// Fields for internal columns. -fn plain_internal_fields() -> [FieldRef; 2] { - // Internal columns are always not null. - [ - Arc::new(Field::new( - SEQUENCE_COLUMN_NAME, - ArrowDataType::UInt64, - false, - )), - Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)), - ] -} - /// Gets the estimated number of series from record batches. /// /// This struct tracks the last timestamp value to detect series boundaries diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index e9515030c0..84f15ad837 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -36,10 +36,10 @@ use store_api::metric_engine_consts::{ use store_api::storage::consts::ReservedColumnId; use store_api::storage::{FileId, RegionId}; -use crate::read::{Batch, FlatSource, Source}; +use crate::read::{Batch, FlatSource}; use crate::sst::file::{FileHandle, FileMeta}; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; -use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger}; +use crate::test_util::{new_batch_builder, new_noop_file_purger}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -190,12 +190,6 @@ pub fn new_sparse_primary_key( buffer } -/// Creates a [Source] from `batches`. -pub fn new_source(batches: &[Batch]) -> Source { - let reader = VecBatchReader::new(batches); - Source::Reader(Box::new(reader)) -} - /// Creates a SST file handle with provided file id pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle { let file_purger = new_noop_file_purger();