refactor(mito2): remove dead scan code (#7925)

* refactor(mito2): remove dead batch parallel scan helpers

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): remove dead merge reader path

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): remove dead batch dedup reader

Signed-off-by: evenyag <realevenyag@gmail.com>

* test(mito2): remove obsolete batch source helper

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove unused plain batch

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-10 11:12:33 +08:00
committed by GitHub
parent e9d783cccf
commit fd94f55193
11 changed files with 94 additions and 2292 deletions

View File

@@ -21,11 +21,7 @@ use criterion::{Criterion, criterion_group, criterion_main};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use mito2::memtable::{IterBuilder, KeyValues, Memtable, MemtableRanges, RangesOptions};
use mito2::read;
use mito2::read::Source;
use mito2::read::dedup::DedupReader;
use mito2::read::merge::MergeReaderBuilder;
use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions};
use mito2::region::options::MergeMode;
use mito2::test_util::column_metadata_to_column_schema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
@@ -126,36 +122,6 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable {
}
async fn flush(mem: &SimpleBulkMemtable) {
let MemtableRanges { ranges, .. } = mem.ranges(None, RangesOptions::for_flush()).unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();
let iter = only_range.build_iter().unwrap();
Source::Iter(iter)
} else {
let sources = ranges
.into_values()
.map(|r| r.build_iter().map(Source::Iter))
.collect::<mito2::error::Result<Vec<_>>>()
.unwrap();
let merge_reader = MergeReaderBuilder::from_sources(sources)
.build()
.await
.unwrap();
let reader = Box::new(DedupReader::new(
merge_reader,
read::dedup::LastRow::new(true),
None,
));
Source::Reader(reader)
};
while let Some(b) = source.next_batch().await.unwrap() {
black_box(b);
}
}
async fn flush_original(mem: &SimpleBulkMemtable) {
let iter = mem
.ranges(None, RangesOptions::default())
.unwrap()
@@ -179,19 +145,10 @@ fn bench_ranges_parallel_vs_sequential(c: &mut Criterion) {
let total_rows_k = num_batch * 10;
let memtable = create_memtable_with_rows(num_batch);
group.bench_with_input(
BenchmarkId::new("flush_by_merge_reader", format!("{}k_rows", total_rows_k)),
&memtable,
|b, memtable| b.to_async(&rt).iter(|| async { flush(memtable).await }),
);
group.bench_with_input(
BenchmarkId::new("flush_by_iter", format!("{}k_rows", total_rows_k)),
&memtable,
|b, memtable| {
b.to_async(&rt)
.iter(|| async { flush_original(memtable).await })
},
|b, memtable| b.to_async(&rt).iter(|| async { flush(memtable).await }),
);
}

View File

@@ -421,10 +421,6 @@ mod tests {
use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
use super::*;
use crate::read;
use crate::read::dedup::DedupReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::{BatchReader, Source};
use crate::region::options::MergeMode;
use crate::test_util::column_metadata_to_column_schema;
@@ -621,81 +617,6 @@ mod tests {
assert_eq!(1, batch.num_rows());
}
#[tokio::test]
async fn test_write_dedup() {
let memtable = new_test_memtable(true, MergeMode::LastRow);
let kvs = build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
OpType::Put,
);
let kv = kvs.iter().next().unwrap();
memtable.write_one(kv).unwrap();
memtable.freeze().unwrap();
let kvs = build_key_values(
&memtable.region_metadata,
1,
&[(1, 1.0, "a".to_string())],
OpType::Delete,
);
let kv = kvs.iter().next().unwrap();
memtable.write_one(kv).unwrap();
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
source.push(Source::Iter(r.build_iter().unwrap()));
}
let reader = MergeReaderBuilder::from_sources(source)
.build()
.await
.unwrap();
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
let mut num_rows = 0;
while let Some(b) = reader.next_batch().await.unwrap() {
num_rows += b.num_rows();
}
assert_eq!(num_rows, 1);
}
#[tokio::test]
async fn test_delete_only() {
let memtable = new_test_memtable(true, MergeMode::LastRow);
let kvs = build_key_values(
&memtable.region_metadata,
0,
&[(1, 1.0, "a".to_string())],
OpType::Delete,
);
let kv = kvs.iter().next().unwrap();
memtable.write_one(kv).unwrap();
memtable.freeze().unwrap();
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
source.push(Source::Iter(r.build_iter().unwrap()));
}
let reader = MergeReaderBuilder::from_sources(source)
.build()
.await
.unwrap();
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
let mut num_rows = 0;
while let Some(b) = reader.next_batch().await.unwrap() {
num_rows += b.num_rows();
assert_eq!(b.num_rows(), 1);
assert_eq!(b.op_types().get_data(0).unwrap(), OpType::Delete as u8);
}
assert_eq!(num_rows, 1);
}
#[tokio::test]
async fn test_single_range() {
let memtable = new_test_memtable(true, MergeMode::LastRow);
@@ -902,8 +823,8 @@ mod tests {
.unwrap()
}
#[tokio::test]
async fn test_write_read_large_string() {
#[test]
fn test_write_read_large_string() {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
@@ -948,25 +869,12 @@ mod tests {
.unwrap();
let MemtableRanges { ranges, .. } =
memtable.ranges(None, RangesOptions::default()).unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();
Source::Iter(only_range.build_iter().unwrap())
} else {
let sources = ranges
.into_values()
.map(|r| r.build_iter().map(Source::Iter))
.collect::<error::Result<Vec<_>>>()
.unwrap();
let merge_reader = MergeReaderBuilder::from_sources(sources)
.build()
.await
.unwrap();
Source::Reader(Box::new(merge_reader))
};
let mut rows = 0;
while let Some(b) = source.next_batch().await.unwrap() {
rows += b.num_rows();
for range in ranges.into_values() {
let iter = range.build_iter().unwrap();
for batch in iter {
rows += batch.unwrap().num_rows();
}
}
assert_eq!(rows, 2);
}

View File

@@ -21,8 +21,6 @@ pub mod flat_dedup;
pub mod flat_merge;
pub mod flat_projection;
pub mod last_row;
pub mod merge;
pub mod plain_batch;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod pruner;

View File

@@ -19,17 +19,13 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::debug;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::MutableVector;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::read::{Batch, BatchColumn};
/// Trait for reporting dedup metrics.
pub trait DedupMetricsReport: Send + Sync {
@@ -37,80 +33,6 @@ pub trait DedupMetricsReport: Send + Sync {
fn report(&self, metrics: &mut DedupMetrics);
}
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
pub struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
}
impl<R, S> DedupReader<R, S> {
/// Creates a new dedup reader.
pub fn new(
source: R,
strategy: S,
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
metrics_reporter,
}
}
}
impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
/// Returns the next deduplicated batch.
async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
self.metrics.maybe_report(&self.metrics_reporter);
return Ok(Some(batch));
}
}
let result = self.strategy.finish(&mut self.metrics)?;
self.metrics.maybe_report(&self.metrics_reporter);
Ok(result)
}
}
#[async_trait]
impl<R: BatchReader, S: DedupStrategy> BatchReader for DedupReader<R, S> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.fetch_next_batch().await
}
}
impl<R, S> Drop for DedupReader<R, S> {
fn drop(&mut self) {
debug!("Dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
#[cfg(test)]
impl<R, S> DedupReader<R, S> {
fn metrics(&self) -> &DedupMetrics {
&self.metrics
}
}
/// Strategy to remove duplicate rows from sorted batches.
pub trait DedupStrategy: Send {
/// Pushes a batch to the dedup strategy.
@@ -124,114 +46,6 @@ pub trait DedupStrategy: Send {
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>>;
}
/// State of the last row in a batch for dedup.
struct BatchLastRow {
primary_key: Vec<u8>,
/// The last timestamp of the batch.
timestamp: Timestamp,
}
/// Dedup strategy that keeps the row with latest sequence of each key.
///
/// This strategy is optimized specially based on the properties of the SST files,
/// memtables and the merge reader. It assumes that batches from files and memtables
/// don't contain duplicate rows and the merge reader never concatenates batches from
/// different source.
///
/// We might implement a new strategy if we need to process files with duplicate rows.
pub struct LastRow {
/// Meta of the last row in the previous batch that has the same key
/// as the batch to push.
prev_batch: Option<BatchLastRow>,
/// Filter deleted rows.
filter_deleted: bool,
}
impl LastRow {
/// Creates a new strategy with the given `filter_deleted` flag.
pub fn new(filter_deleted: bool) -> Self {
Self {
prev_batch: None,
filter_deleted,
}
}
}
impl DedupStrategy for LastRow {
fn push_batch(
&mut self,
mut batch: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
let start = Instant::now();
if batch.is_empty() {
return Ok(None);
}
debug_assert!(batch.first_timestamp().is_some());
let prev_timestamp = match &self.prev_batch {
Some(prev_batch) => {
if prev_batch.primary_key != batch.primary_key() {
// The key has changed. This is the first batch of the
// new key.
None
} else {
Some(prev_batch.timestamp)
}
}
None => None,
};
if batch.first_timestamp() == prev_timestamp {
metrics.num_unselected_rows += 1;
// This batch contains a duplicate row, skip it.
if batch.num_rows() == 1 {
// We don't need to update `prev_batch` because they have the same
// key and timestamp.
metrics.dedup_cost += start.elapsed();
return Ok(None);
}
// Skips the first row.
batch = batch.slice(1, batch.num_rows() - 1);
}
// Store current batch to `prev_batch` so we could compare the next batch
// with this batch. We store batch before filtering it as rows with `OpType::Delete`
// would be removed from the batch after filter, then we may store an incorrect `last row`
// of previous batch.
match &mut self.prev_batch {
Some(prev) => {
// Reuse the primary key buffer.
prev.primary_key.clone_from(&batch.primary_key);
prev.timestamp = batch.last_timestamp().unwrap();
}
None => {
self.prev_batch = Some(BatchLastRow {
primary_key: batch.primary_key().to_vec(),
timestamp: batch.last_timestamp().unwrap(),
})
}
}
// Filters deleted rows.
if self.filter_deleted {
filter_deleted_from_batch(&mut batch, metrics)?;
}
metrics.dedup_cost += start.elapsed();
// The batch can become empty if all rows are deleted.
if batch.is_empty() {
Ok(None)
} else {
Ok(Some(batch))
}
}
fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
Ok(None)
}
}
/// Removes deleted rows from the batch and updates metrics.
fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> Result<()> {
let num_rows = batch.num_rows();
@@ -672,137 +486,10 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{TimestampMillisecondArray, UInt8Array, UInt64Array};
use super::*;
use crate::read::BatchBuilder;
use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
#[tokio::test]
async fn test_dedup_reader_no_duplications() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(b"k1", &[3], &[13], &[OpType::Put], &[23]),
new_batch(
b"k2",
&[1, 2],
&[111, 112],
&[OpType::Put, OpType::Put],
&[31, 32],
),
];
// Test last row.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
// Test last non-null.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_dedup_reader_duplications() {
let input = [
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
// empty batch.
new_batch(b"k1", &[], &[], &[], &[]),
// Duplicate with the previous batch.
new_batch(
b"k1",
&[2, 3, 4],
&[10, 13, 13],
&[OpType::Put, OpType::Put, OpType::Delete],
&[2, 13, 14],
),
new_batch(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[101, 0],
),
new_batch(b"k2", &[2], &[19], &[OpType::Put], &[102]),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
// This batch won't increase the deleted rows count as it
// is filtered out by the previous batch.
new_batch(b"k3", &[2], &[19], &[OpType::Delete], &[0]),
];
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true), None);
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(b"k1", &[3], &[13], &[OpType::Put], &[13]),
new_batch(b"k2", &[1], &[20], &[OpType::Put], &[101]),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
],
)
.await;
assert_eq!(5, reader.metrics().num_unselected_rows);
assert_eq!(2, reader.metrics().num_deleted_rows);
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(false), None);
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[11, 12],
),
new_batch(
b"k1",
&[3, 4],
&[13, 13],
&[OpType::Put, OpType::Delete],
&[13, 14],
),
new_batch(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[101, 0],
),
new_batch(b"k3", &[2], &[20], &[OpType::Put], &[202]),
],
)
.await;
assert_eq!(3, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
/// Returns a new [Batch] whose field has column id 1, 2.
fn new_batch_multi_fields(
@@ -839,235 +526,6 @@ mod tests {
builder.build().unwrap()
}
#[tokio::test]
async fn test_last_non_null_merge() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, None)],
),
// empty batch.
new_batch_multi_fields(b"k1", &[], &[], &[], &[]),
// Duplicate with the previous batch.
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(Some(12), None)]),
new_batch_multi_fields(
b"k1",
&[2, 3, 4],
&[10, 13, 13],
&[OpType::Put, OpType::Put, OpType::Delete],
&[(Some(2), Some(22)), (Some(13), None), (None, Some(14))],
),
new_batch_multi_fields(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[(Some(101), Some(101)), (None, None)],
),
new_batch_multi_fields(
b"k2",
&[2],
&[19],
&[OpType::Put],
&[(Some(102), Some(102))],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
// This batch won't increase the deleted rows count as it
// is filtered out by the previous batch. (All fields are null).
new_batch_multi_fields(b"k3", &[2], &[19], &[OpType::Delete], &[(None, None)]),
];
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(22))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), None)]),
new_batch_multi_fields(
b"k2",
&[1],
&[20],
&[OpType::Put],
&[(Some(101), Some(101))],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
],
)
.await;
assert_eq!(6, reader.metrics().num_unselected_rows);
assert_eq!(2, reader.metrics().num_deleted_rows);
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(22))],
),
new_batch_multi_fields(
b"k1",
&[3, 4],
&[13, 13],
&[OpType::Put, OpType::Delete],
&[(Some(13), None), (None, Some(14))],
),
new_batch_multi_fields(
b"k2",
&[1, 2],
&[20, 20],
&[OpType::Put, OpType::Delete],
&[(Some(101), Some(101)), (None, None)],
),
new_batch_multi_fields(
b"k3",
&[2],
&[20],
&[OpType::Put],
&[(Some(202), Some(202))],
),
],
)
.await;
assert_eq!(4, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_non_null_skip_merge_single() {
let input = [new_batch_multi_fields(
b"k1",
&[1, 2, 3],
&[13, 11, 13],
&[OpType::Put, OpType::Delete, OpType::Put],
&[(Some(11), Some(11)), (None, None), (Some(13), Some(13))],
)];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[new_batch_multi_fields(
b"k1",
&[1, 3],
&[13, 13],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(13), Some(13))],
)],
)
.await;
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(1, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_non_null_skip_merge_no_null() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(12))],
),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
new_batch_multi_fields(
b"k1",
&[2, 3],
&[9, 13],
&[OpType::Put, OpType::Put],
&[(Some(32), None), (Some(13), Some(13))],
),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (Some(12), Some(12))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(13), Some(13))]),
],
)
.await;
assert_eq!(2, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
#[tokio::test]
async fn test_last_non_null_merge_null() {
let input = [
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, None)],
),
new_batch_multi_fields(b"k1", &[2], &[10], &[OpType::Put], &[(None, Some(22))]),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
new_batch_multi_fields(
b"k1",
&[1, 2],
&[13, 11],
&[OpType::Put, OpType::Put],
&[(Some(11), Some(11)), (None, Some(22))],
),
new_batch_multi_fields(b"k1", &[3], &[13], &[OpType::Put], &[(Some(33), None)]),
],
)
.await;
assert_eq!(1, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
}
fn check_dedup_strategy(input: &[Batch], strategy: &mut dyn DedupStrategy, expect: &[Batch]) {
let mut actual = Vec::new();
let mut metrics = DedupMetrics::default();

View File

@@ -14,8 +14,9 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::debug;
@@ -34,7 +35,6 @@ use crate::error::{ComputeArrowSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::sst::parquet::flat_format::{
primary_key_column_index, sequence_column_index, time_index_column_index,
};
@@ -105,6 +105,84 @@ struct BatchCursor {
row_idx: usize,
}
/// Trait for reporting merge metrics.
pub trait MergeMetricsReport: Send + Sync {
/// Reports and resets the metrics.
fn report(&self, metrics: &mut MergeMetrics);
}
/// Metrics for the merge reader.
#[derive(Default)]
pub struct MergeMetrics {
/// Cost to initialize the reader.
pub(crate) init_cost: Duration,
/// Total scan cost of the reader.
pub(crate) scan_cost: Duration,
/// Number of times to fetch batches.
pub(crate) num_fetch_by_batches: usize,
/// Number of times to fetch rows.
pub(crate) num_fetch_by_rows: usize,
/// Cost to fetch batches from sources.
pub(crate) fetch_cost: Duration,
}
impl fmt::Debug for MergeMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.scan_cost.is_zero() {
return write!(f, "{{}}");
}
write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
if !self.init_cost.is_zero() {
write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
}
if self.num_fetch_by_batches > 0 {
write!(
f,
r#", "num_fetch_by_batches":{}"#,
self.num_fetch_by_batches
)?;
}
if self.num_fetch_by_rows > 0 {
write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
}
if !self.fetch_cost.is_zero() {
write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
}
write!(f, "}}")
}
}
impl MergeMetrics {
/// Merges metrics from another MergeMetrics instance.
pub(crate) fn merge(&mut self, other: &MergeMetrics) {
let MergeMetrics {
init_cost,
scan_cost,
num_fetch_by_batches,
num_fetch_by_rows,
fetch_cost,
} = other;
self.init_cost += *init_cost;
self.scan_cost += *scan_cost;
self.num_fetch_by_batches += *num_fetch_by_batches;
self.num_fetch_by_rows += *num_fetch_by_rows;
self.fetch_cost += *fetch_cost;
}
/// Reports the metrics if scan_cost exceeds 10ms and resets them.
pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
if self.scan_cost.as_millis() > 10
&& let Some(r) = reporter
{
r.report(self);
}
}
}
/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
// Ports from https://github.com/apache/datafusion/blob/49.0.0/datafusion/physical-plan/src/sorts/builder.rs
// Adds the `take_remaining_rows()` method.

View File

@@ -1,982 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Merge reader implementation.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fmt, mem};
use async_trait::async_trait;
use common_telemetry::debug;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// Trait for reporting merge metrics.
pub trait MergeMetricsReport: Send + Sync {
/// Reports and resets the metrics.
fn report(&self, metrics: &mut MergeMetrics);
}
/// Reader to merge sorted batches.
///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batches from sources **must** not be empty.
///
/// The reader won't concatenate batches. Each batch returned by the reader also doesn't
/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same
/// as the first one in the next batch.
pub struct MergeReader {
/// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
/// Each node yields batches from a `source`.
///
/// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp)
/// range of the **root node** in the `hot` heap.
hot: BinaryHeap<Node>,
/// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window.
///
/// `Node` in this heap **must** not be empty.
cold: BinaryHeap<Node>,
/// Batch to output.
output_batch: Option<Batch>,
/// Local metrics.
metrics: MergeMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
#[async_trait]
impl BatchReader for MergeReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let start = Instant::now();
while !self.hot.is_empty() && self.output_batch.is_none() {
if self.hot.len() == 1 {
// No need to do merge sort if only one batch in the hot heap.
self.fetch_batch_from_hottest().await?;
self.metrics.num_fetch_by_batches += 1;
} else {
// We could only fetch rows that less than the next node from the hottest node.
self.fetch_rows_from_hottest().await?;
self.metrics.num_fetch_by_rows += 1;
}
}
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(Some(batch))
} else {
// Nothing fetched.
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(None)
}
}
}
impl Drop for MergeReader {
fn drop(&mut self) {
debug!("Merge reader finished, metrics: {:?}", self.metrics);
READ_STAGE_ELAPSED
.with_label_values(&["merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub async fn new(
sources: Vec<Source>,
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> Result<MergeReader> {
let start = Instant::now();
let mut metrics = MergeMetrics::default();
let mut cold = BinaryHeap::with_capacity(sources.len());
let hot = BinaryHeap::with_capacity(sources.len());
for source in sources {
let node = Node::new(source, &mut metrics).await?;
if !node.is_eof() {
// Ensure `cold` don't have eof nodes.
cold.push(node);
}
}
let mut reader = MergeReader {
hot,
cold,
output_batch: None,
metrics,
metrics_reporter,
};
// Initializes the reader.
reader.refill_hot();
let elapsed = start.elapsed();
reader.metrics.init_cost += elapsed;
reader.metrics.scan_cost += elapsed;
Ok(reader)
}
/// Moves nodes in `cold` heap, whose key range is overlapped with current merge
/// window to `hot` heap.
fn refill_hot(&mut self) {
while !self.cold.is_empty() {
if let Some(merge_window) = self.hot.peek() {
let warmest = self.cold.peek().unwrap();
if warmest.is_behind(merge_window) {
// if the warmest node in the `cold` heap is totally after the
// `merge_window`, then no need to add more nodes into the `hot`
// heap for merge sorting.
break;
}
}
let warmest = self.cold.pop().unwrap();
self.hot.push(warmest);
}
}
/// Fetches one batch from the hottest node.
async fn fetch_batch_from_hottest(&mut self) -> Result<()> {
assert_eq!(1, self.hot.len());
let mut hottest = self.hot.pop().unwrap();
let batch = hottest.fetch_batch(&mut self.metrics).await?;
Self::maybe_output_batch(batch, &mut self.output_batch)?;
self.reheap(hottest)
}
/// Fetches non-duplicated rows from the hottest node.
async fn fetch_rows_from_hottest(&mut self) -> Result<()> {
// Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
// Pop hottest node.
let mut top_node = self.hot.pop().unwrap();
let top = top_node.current_batch();
// Min timestamp and its sequence in the next batch.
let next_min_ts = {
let next_node = self.hot.peek().unwrap();
let next = next_node.current_batch();
// top and next have overlapping rows so they must have same primary keys.
debug_assert_eq!(top.primary_key(), next.primary_key());
// Safety: Batches in the heap is not empty, so we can use unwrap here.
next.first_timestamp().unwrap()
};
// Safety: Batches in the heap is not empty, so we can use unwrap here.
let timestamps = top.timestamps_native().unwrap();
// Binary searches the timestamp in the top batch.
// Safety: Batches should have the same timestamp resolution so we can compare the native
// value directly.
let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
Ok(pos) => pos,
Err(pos) => {
// No duplicate timestamp. Outputs timestamp before `pos`.
Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
top_node.skip_rows(pos, &mut self.metrics).await?;
return self.reheap(top_node);
}
};
// No need to remove duplicate timestamps.
let output_end = if duplicate_pos == 0 {
// If the first timestamp of the top node is duplicate. We can simply return the first row
// as the heap ensure it is the one with largest sequence.
1
} else {
// We don't know which one has the larger sequence so we use the range before
// the duplicate pos.
duplicate_pos
};
Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
top_node.skip_rows(output_end, &mut self.metrics).await?;
self.reheap(top_node)
}
/// Push the node popped from `hot` back to a proper heap.
fn reheap(&mut self, node: Node) -> Result<()> {
if node.is_eof() {
// If the node is EOF, don't put it into the heap again.
// The merge window would be updated, need to refill the hot heap.
self.refill_hot();
} else {
// Find a proper heap for this node.
let node_is_cold = if let Some(hottest) = self.hot.peek() {
// If key range of this node is behind the hottest node's then we can
// push it to the cold heap. Otherwise we should push it to the hot heap.
node.is_behind(hottest)
} else {
// The hot heap is empty, but we don't known whether the current
// batch of this node is still the hottest.
true
};
if node_is_cold {
self.cold.push(node);
} else {
self.hot.push(node);
}
// Anyway, the merge window has been changed, we need to refill the hot heap.
self.refill_hot();
}
Ok(())
}
/// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
///
/// Ignores the `batch` if it is empty.
fn maybe_output_batch(batch: Batch, output_batch: &mut Option<Batch>) -> Result<()> {
debug_assert!(output_batch.is_none());
if batch.is_empty() {
return Ok(());
}
*output_batch = Some(batch);
Ok(())
}
}
/// Builder to build and initialize a [MergeReader].
#[derive(Default)]
pub struct MergeReaderBuilder {
/// Input sources.
///
/// All source must yield batches with the same schema.
sources: Vec<Source>,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
impl MergeReaderBuilder {
/// Returns an empty builder.
pub fn new() -> MergeReaderBuilder {
MergeReaderBuilder::default()
}
/// Creates a builder from sources.
pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
MergeReaderBuilder {
sources,
metrics_reporter: None,
}
}
/// Pushes a batch reader to sources.
pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
self.sources.push(Source::Reader(reader));
self
}
/// Pushes a batch iterator to sources.
pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self {
self.sources.push(Source::Iter(iter));
self
}
/// Sets the metrics reporter.
pub fn with_metrics_reporter(
&mut self,
reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> &mut Self {
self.metrics_reporter = reporter;
self
}
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
let metrics_reporter = self.metrics_reporter.take();
MergeReader::new(sources, metrics_reporter).await
}
}
/// Metrics for the merge reader.
#[derive(Default)]
pub struct MergeMetrics {
/// Cost to initialize the reader.
pub(crate) init_cost: Duration,
/// Total scan cost of the reader.
pub(crate) scan_cost: Duration,
/// Number of times to fetch batches.
pub(crate) num_fetch_by_batches: usize,
/// Number of times to fetch rows.
pub(crate) num_fetch_by_rows: usize,
/// Cost to fetch batches from sources.
pub(crate) fetch_cost: Duration,
}
impl fmt::Debug for MergeMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Skip output if scan_cost is zero
if self.scan_cost.is_zero() {
return write!(f, "{{}}");
}
write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
if !self.init_cost.is_zero() {
write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
}
if self.num_fetch_by_batches > 0 {
write!(
f,
r#", "num_fetch_by_batches":{}"#,
self.num_fetch_by_batches
)?;
}
if self.num_fetch_by_rows > 0 {
write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
}
if !self.fetch_cost.is_zero() {
write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
}
write!(f, "}}")
}
}
impl MergeMetrics {
/// Merges metrics from another MergeMetrics instance.
pub(crate) fn merge(&mut self, other: &MergeMetrics) {
let MergeMetrics {
init_cost,
scan_cost,
num_fetch_by_batches,
num_fetch_by_rows,
fetch_cost,
} = other;
self.init_cost += *init_cost;
self.scan_cost += *scan_cost;
self.num_fetch_by_batches += *num_fetch_by_batches;
self.num_fetch_by_rows += *num_fetch_by_rows;
self.fetch_cost += *fetch_cost;
}
/// Reports the metrics if scan_cost exceeds 10ms and resets them.
pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
if self.scan_cost.as_millis() > 10
&& let Some(r) = reporter
{
r.report(self);
}
}
}
/// A `Node` represent an individual input data source to be merged.
struct Node {
/// Data source of this `Node`.
source: Source,
/// Current batch to be read. The node ensures the batch is not empty.
///
/// `None` means the `source` has reached EOF.
current_batch: Option<CompareFirst>,
}
impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
Ok(Node {
source,
current_batch,
})
}
/// Returns whether the node still has batch to read.
fn is_eof(&self) -> bool {
self.current_batch.is_none()
}
/// Returns the primary key of current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn primary_key(&self) -> &[u8] {
self.current_batch().primary_key()
}
/// Returns current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn current_batch(&self) -> &Batch {
&self.current_batch.as_ref().unwrap().0
}
/// Returns current batch and fetches next batch
/// from the source.
///
/// # Panics
/// Panics if the node has reached EOF.
async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
Ok(current.0)
}
/// Returns true if the key range of current batch in `self` is behind (exclusive) current
/// batch in `other`.
///
/// # Panics
/// Panics if either `self` or `other` is EOF.
fn is_behind(&self, other: &Node) -> bool {
debug_assert!(!self.current_batch().is_empty());
debug_assert!(!other.current_batch().is_empty());
// We only compare pk and timestamp so nodes in the cold
// heap don't have overlapping timestamps with the hottest node
// in the hot heap.
self.primary_key().cmp(other.primary_key()).then_with(|| {
self.current_batch()
.first_timestamp()
.cmp(&other.current_batch().last_timestamp())
}) == Ordering::Greater
}
/// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches
/// next batch from the node.
///
/// # Panics
/// Panics if the node is EOF.
async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> {
let batch = self.current_batch();
debug_assert!(batch.num_rows() >= num_to_skip);
let remaining = batch.num_rows() - num_to_skip;
if remaining == 0 {
// Nothing remains, we need to fetch next batch to ensure the batch is not empty.
self.fetch_batch(metrics).await?;
} else {
debug_assert!(!batch.is_empty());
self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
}
Ok(())
}
}
impl PartialEq for Node {
fn eq(&self, other: &Node) -> bool {
self.current_batch == other.current_batch
}
}
impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Node {
fn cmp(&self, other: &Node) -> Ordering {
// The std binary heap is a max heap, but we want the nodes are ordered in
// ascend order, so we compare the nodes in reverse order.
other.current_batch.cmp(&self.current_batch)
}
}
/// Type to compare [Batch] by first row.
///
/// It ignores op type as sequence is enough to distinguish different rows.
struct CompareFirst(Batch);
impl PartialEq for CompareFirst {
fn eq(&self, other: &Self) -> bool {
self.0.primary_key() == other.0.primary_key()
&& self.0.first_timestamp() == other.0.first_timestamp()
&& self.0.first_sequence() == other.0.first_sequence()
}
}
impl Eq for CompareFirst {}
impl PartialOrd for CompareFirst {
fn partial_cmp(&self, other: &CompareFirst) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CompareFirst {
/// Compares by primary key, time index, sequence desc.
fn cmp(&self, other: &CompareFirst) -> Ordering {
self.0
.primary_key()
.cmp(other.0.primary_key())
.then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp()))
.then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;
use super::*;
use crate::test_util::{VecBatchReader, check_reader_result, new_batch};
#[tokio::test]
async fn test_merge_reader_empty() {
let mut reader = MergeReaderBuilder::new().build().await.unwrap();
assert!(reader.next_batch().await.unwrap().is_none());
assert!(reader.next_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_merge_non_overlapping() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[7, 8],
&[17, 18],
&[OpType::Put, OpType::Delete],
&[27, 28],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Put, OpType::Put],
&[24, 25],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Put, OpType::Put],
&[24, 25],
),
new_batch(
b"k1",
&[7, 8],
&[17, 18],
&[OpType::Put, OpType::Delete],
&[27, 28],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
],
)
.await;
}
#[tokio::test]
async fn test_merge_reheap_hot() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 3],
&[10, 10],
&[OpType::Put, OpType::Put],
&[21, 23],
),
new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[2, 4],
&[11, 11],
&[OpType::Put, OpType::Put],
&[32, 34],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
new_batch(b"k1", &[3], &[10], &[OpType::Put], &[23]),
new_batch(b"k1", &[4], &[11], &[OpType::Put], &[34]),
new_batch(b"k2", &[3], &[10], &[OpType::Put], &[23]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_overlapping() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[4, 5],
&[14, 15],
// This override 4 and deletes 5.
&[OpType::Put, OpType::Delete],
&[24, 25],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
// This delete 2.
&[OpType::Delete, OpType::Put],
&[22, 23],
),
]);
let reader2 = VecBatchReader::new(&[
new_batch(
b"k1",
&[3, 4, 5],
&[10, 10, 10],
&[OpType::Put, OpType::Put, OpType::Put],
&[33, 34, 35],
),
new_batch(
b"k2",
&[1, 10],
&[11, 20],
&[OpType::Put, OpType::Put],
&[21, 30],
),
]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(b"k1", &[3], &[10], &[OpType::Put], &[33]),
new_batch(b"k1", &[4], &[14], &[OpType::Put], &[24]),
new_batch(b"k1", &[4], &[10], &[OpType::Put], &[34]),
new_batch(b"k1", &[5], &[15], &[OpType::Delete], &[25]),
new_batch(b"k1", &[5], &[10], &[OpType::Put], &[35]),
new_batch(b"k2", &[1], &[11], &[OpType::Put], &[21]),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
new_batch(b"k2", &[10], &[20], &[OpType::Put], &[30]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_deleted() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Delete, OpType::Delete],
&[21, 22],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Delete, OpType::Delete],
&[24, 25],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Delete, OpType::Delete],
&[21, 22],
),
new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Delete, OpType::Delete],
&[24, 25],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
],
)
.await;
}
#[tokio::test]
async fn test_merge_next_node_empty() {
let reader1 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
)]);
let reader2 = VecBatchReader::new(&[new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33])]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21]),
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[33]),
new_batch(b"k1", &[2], &[12], &[OpType::Put], &[22]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_top_node_empty() {
let reader1 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 2],
&[10, 10],
&[OpType::Put, OpType::Put],
&[21, 22],
)]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[2, 3],
&[11, 11],
&[OpType::Put, OpType::Put],
&[32, 33],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_large_range() {
let reader1 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 10],
&[10, 10],
&[OpType::Put, OpType::Put],
&[21, 30],
)]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 20],
&[11, 11],
&[OpType::Put, OpType::Put],
&[31, 40],
)]);
// The hot heap have a node that doesn't have duplicate
// timestamps.
let reader3 = VecBatchReader::new(&[new_batch(
b"k1",
&[6, 8],
&[11, 11],
&[OpType::Put, OpType::Put],
&[36, 38],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.push_batch_reader(Box::new(reader3))
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[1], &[11], &[OpType::Put], &[31]),
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(
b"k1",
&[6, 8],
&[11, 11],
&[OpType::Put, OpType::Put],
&[36, 38],
),
new_batch(b"k1", &[10], &[10], &[OpType::Put], &[30]),
new_batch(b"k1", &[20], &[11], &[OpType::Put], &[40]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_many_duplicates() {
let mut builder = MergeReaderBuilder::new();
for i in 0..10 {
let batches: Vec<_> = (0..8)
.map(|ts| new_batch(b"k1", &[ts], &[i], &[OpType::Put], &[100]))
.collect();
let reader = VecBatchReader::new(&batches);
builder.push_batch_reader(Box::new(reader));
}
let mut reader = builder.build().await.unwrap();
let mut expect = Vec::with_capacity(80);
for ts in 0..8 {
for i in 0..10 {
let batch = new_batch(b"k1", &[ts], &[9 - i], &[OpType::Put], &[100]);
expect.push(batch);
}
}
check_reader_result(&mut reader, &expect).await;
}
#[tokio::test]
async fn test_merge_keep_duplicate() {
let reader1 = VecBatchReader::new(&[new_batch(
b"k1",
&[1, 2],
&[10, 10],
&[OpType::Put, OpType::Put],
&[21, 22],
)]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[2, 3],
&[11, 11],
&[OpType::Put, OpType::Put],
&[32, 33],
)]);
let sources = vec![
Source::Reader(Box::new(reader1)),
Source::Iter(Box::new(reader2)),
];
let mut reader = MergeReaderBuilder::from_sources(sources)
.build()
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch(b"k1", &[1], &[10], &[OpType::Put], &[21]),
new_batch(b"k1", &[2], &[11], &[OpType::Put], &[32]),
new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]),
new_batch(b"k1", &[3], &[11], &[OpType::Put], &[33]),
],
)
.await;
}
}

View File

@@ -1,505 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Batch without an encoded primary key.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt8Array, UInt64Array};
use datatypes::arrow::compute::filter_record_batch;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result,
UnexpectedSnafu,
};
/// Number of columns that have fixed positions.
///
/// Contains all internal columns.
pub(crate) const PLAIN_FIXED_POS_COLUMN_NUM: usize = 2;
/// [PlainBatch] represents a batch of rows.
/// It is a wrapper around [RecordBatch].
///
/// The columns order is the same as the order of the columns read from the SST.
/// It always contains two internal columns now. We may change modify this behavior
/// in the future.
#[derive(Debug)]
pub struct PlainBatch {
/// The original record batch.
record_batch: RecordBatch,
}
impl PlainBatch {
/// Creates a new [PlainBatch] from a [RecordBatch].
pub fn new(record_batch: RecordBatch) -> Self {
assert!(
record_batch.num_columns() >= 2,
"record batch missing internal columns, num_columns: {}",
record_batch.num_columns()
);
Self { record_batch }
}
/// Returns a new [PlainBatch] with the given columns.
pub fn with_new_columns(&self, columns: Vec<ArrayRef>) -> Result<Self> {
let record_batch = RecordBatch::try_new(self.record_batch.schema(), columns)
.context(NewRecordBatchSnafu)?;
Ok(Self::new(record_batch))
}
/// Returns the number of columns in the batch.
pub fn num_columns(&self) -> usize {
self.record_batch.num_columns()
}
/// Returns the number of rows in the batch.
pub fn num_rows(&self) -> usize {
self.record_batch.num_rows()
}
/// Returns true if the batch is empty.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
/// Returns all columns.
pub fn columns(&self) -> &[ArrayRef] {
self.record_batch.columns()
}
/// Returns the array of column at index `idx`.
pub fn column(&self, idx: usize) -> &ArrayRef {
self.record_batch.column(idx)
}
/// Returns the slice of internal columns.
pub fn internal_columns(&self) -> &[ArrayRef] {
&self.record_batch.columns()[self.record_batch.num_columns() - PLAIN_FIXED_POS_COLUMN_NUM..]
}
/// Returns the inner record batch.
pub fn as_record_batch(&self) -> &RecordBatch {
&self.record_batch
}
/// Converts this batch into a record batch.
pub fn into_record_batch(self) -> RecordBatch {
self.record_batch
}
/// Filters this batch by the boolean array.
pub fn filter(&self, predicate: &BooleanArray) -> Result<Self> {
let record_batch =
filter_record_batch(&self.record_batch, predicate).context(ComputeArrowSnafu)?;
Ok(Self::new(record_batch))
}
/// Returns the column index of the sequence column.
#[allow(dead_code)]
pub(crate) fn sequence_column_index(&self) -> usize {
self.record_batch.num_columns() - PLAIN_FIXED_POS_COLUMN_NUM
}
}
/// Helper struct to fill default values and internal columns.
pub struct ColumnFiller<'a> {
/// Region metadata information
metadata: &'a RegionMetadata,
/// Schema for the output record batch
schema: SchemaRef,
/// Map of column names to indices in the input record batch
name_to_index: HashMap<String, usize>,
}
impl<'a> ColumnFiller<'a> {
/// Creates a new ColumnFiller
/// The `schema` is the sst schema of the `metadata`.
pub fn new(
metadata: &'a RegionMetadata,
schema: SchemaRef,
record_batch: &RecordBatch,
) -> Self {
debug_assert_eq!(metadata.column_metadatas.len() + 2, schema.fields().len());
// Pre-construct the name to index map
let name_to_index: HashMap<_, _> = record_batch
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, field)| (field.name().clone(), i))
.collect();
Self {
metadata,
schema,
name_to_index,
}
}
/// Fills default values and internal columns for a [RecordBatch].
pub fn fill_missing_columns(
&self,
record_batch: &RecordBatch,
sequence: SequenceNumber,
op_type: OpType,
) -> Result<RecordBatch> {
let num_rows = record_batch.num_rows();
let mut new_columns =
Vec::with_capacity(record_batch.num_columns() + PLAIN_FIXED_POS_COLUMN_NUM);
// Fills default values.
// Implementation based on `WriteRequest::fill_missing_columns()`.
for column in &self.metadata.column_metadatas {
let array = match self.name_to_index.get(&column.column_schema.name) {
Some(index) => record_batch.column(*index).clone(),
None => match op_type {
OpType::Put => {
// For put requests, we use the default value from column schema.
fill_column_put_default(self.metadata.region_id, column, num_rows)?
}
OpType::Delete => {
// For delete requests, we need default value for padding.
fill_column_delete_default(column, num_rows)?
}
},
};
new_columns.push(array);
}
// Adds internal columns.
// Adds the sequence number.
let sequence_array = Arc::new(UInt64Array::from(vec![sequence; num_rows]));
// Adds the op type.
let op_type_array = Arc::new(UInt8Array::from(vec![op_type as u8; num_rows]));
new_columns.push(sequence_array);
new_columns.push(op_type_array);
RecordBatch::try_new(self.schema.clone(), new_columns).context(NewRecordBatchSnafu)
}
}
fn fill_column_put_default(
region_id: RegionId,
column: &ColumnMetadata,
num_rows: usize,
) -> Result<ArrayRef> {
if column.column_schema.is_default_impure() {
return UnexpectedSnafu {
reason: format!(
"unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
region_id,
column.column_schema.name,
column.column_schema.default_constraint(),
),
}
.fail();
}
let vector = column
.column_schema
.create_default_vector(num_rows)
.context(CreateDefaultSnafu {
region_id,
column: &column.column_schema.name,
})?
// This column doesn't have default value.
.with_context(|| InvalidRequestSnafu {
region_id,
reason: format!(
"column {} does not have default value",
column.column_schema.name
),
})?;
Ok(vector.to_arrow_array())
}
fn fill_column_delete_default(column: &ColumnMetadata, num_rows: usize) -> Result<ArrayRef> {
// For delete requests, we need a default value for padding
let vector = column
.column_schema
.create_default_vector_for_padding(num_rows);
Ok(vector.to_arrow_array())
}
#[cfg(test)]
mod tests {
use api::v1::SemanticType;
use datatypes::arrow::array::{
Float64Array, Int32Array, StringArray, TimestampMillisecondArray,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datatypes::schema::ColumnSchema;
use datatypes::schema::constraint::ColumnDefaultConstraint;
use datatypes::value::Value;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use store_api::storage::{ConcreteDataType, RegionId};
use super::*;
use crate::sst::to_plain_sst_arrow_schema;
/// Creates a test region metadata with schema: k0(string), ts(timestamp), v1(float64)
fn create_test_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(100, 200));
builder
// Add string key column
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false)
.with_default_constraint(None)
.unwrap(),
semantic_type: SemanticType::Tag,
column_id: 0,
})
// Add timestamp column
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(None)
.unwrap(),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
// Add float value column with default
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Float64(
datatypes::value::OrderedFloat::from(42.0),
))))
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 2,
})
.primary_key(vec![0]);
builder.build().unwrap()
}
#[test]
fn test_column_filler_put() {
let region_metadata = create_test_region_metadata();
let output_schema = to_plain_sst_arrow_schema(&region_metadata);
// Create input record batch with only k0 and ts columns (v1 is missing)
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", DataType::Utf8, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]));
let k0_values: ArrayRef = Arc::new(StringArray::from(vec!["key1", "key2"]));
let ts_values: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
let input_batch =
RecordBatch::try_new(input_schema, vec![k0_values.clone(), ts_values.clone()]).unwrap();
// Create column filler
let filler = ColumnFiller::new(&region_metadata, output_schema.clone(), &input_batch);
// Fill missing columns with OpType::Put
let result = filler
.fill_missing_columns(&input_batch, 100, OpType::Put)
.unwrap();
// Verify the result
// Create an expected record batch to compare against
let expected_columns = vec![
k0_values.clone(),
ts_values.clone(),
Arc::new(Float64Array::from(vec![42.0, 42.0])),
Arc::new(UInt64Array::from(vec![100, 100])),
Arc::new(UInt8Array::from(vec![OpType::Put as u8, OpType::Put as u8])),
];
let expected_batch = RecordBatch::try_new(output_schema.clone(), expected_columns).unwrap();
assert_eq!(expected_batch, result);
}
#[test]
fn test_column_filler_delete() {
let region_metadata = create_test_region_metadata();
let output_schema = to_plain_sst_arrow_schema(&region_metadata);
// Create input record batch with only k0 and ts columns (v1 is missing)
let input_schema = Arc::new(Schema::new(vec![
Field::new("k0", DataType::Utf8, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]));
let k0_values: ArrayRef = Arc::new(StringArray::from(vec!["key1", "key2"]));
let ts_values: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
let input_batch =
RecordBatch::try_new(input_schema, vec![k0_values.clone(), ts_values.clone()]).unwrap();
// Create column filler
let filler = ColumnFiller::new(&region_metadata, output_schema.clone(), &input_batch);
// Fill missing columns with OpType::Delete
let result = filler
.fill_missing_columns(&input_batch, 200, OpType::Delete)
.unwrap();
// Verify the result by creating an expected record batch to compare against
let v1_default = Arc::new(Float64Array::from(vec![None, None]));
let expected_columns = vec![
k0_values.clone(),
ts_values.clone(),
v1_default,
Arc::new(UInt64Array::from(vec![200, 200])),
Arc::new(UInt8Array::from(vec![
OpType::Delete as u8,
OpType::Delete as u8,
])),
];
let expected_batch = RecordBatch::try_new(output_schema.clone(), expected_columns).unwrap();
assert_eq!(expected_batch, result);
}
fn create_test_record_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Int32, false),
Field::new("col2", DataType::Utf8, false),
Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
]));
let col1 = Arc::new(Int32Array::from(vec![1, 2, 3]));
let col2 = Arc::new(StringArray::from(vec!["a", "b", "c"]));
let sequence = Arc::new(UInt64Array::from(vec![100, 101, 102]));
let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1]));
RecordBatch::try_new(schema, vec![col1, col2, sequence, op_type]).unwrap()
}
#[test]
fn test_plain_batch_basic_methods() {
let record_batch = create_test_record_batch();
let plain_batch = PlainBatch::new(record_batch.clone());
// Test basic properties
assert_eq!(plain_batch.num_columns(), 4);
assert_eq!(plain_batch.num_rows(), 3);
assert!(!plain_batch.is_empty());
assert_eq!(plain_batch.columns().len(), 4);
// Test internal columns access
let internal_columns = plain_batch.internal_columns();
assert_eq!(internal_columns.len(), PLAIN_FIXED_POS_COLUMN_NUM);
assert_eq!(internal_columns[0].len(), 3);
assert_eq!(internal_columns[1].len(), 3);
// Test column access
let col1 = plain_batch.column(0);
assert_eq!(col1.len(), 3);
assert_eq!(
col1.as_any().downcast_ref::<Int32Array>().unwrap().value(0),
1
);
// Test sequence column index
assert_eq!(plain_batch.sequence_column_index(), 2);
// Test to record batch.
assert_eq!(record_batch, *plain_batch.as_record_batch());
assert_eq!(record_batch, plain_batch.into_record_batch());
}
#[test]
fn test_with_new_columns() {
let record_batch = create_test_record_batch();
let plain_batch = PlainBatch::new(record_batch);
// Create new columns
let col1 = Arc::new(Int32Array::from(vec![10, 20, 30]));
let col2 = Arc::new(StringArray::from(vec!["x", "y", "z"]));
let sequence = Arc::new(UInt64Array::from(vec![200, 201, 202]));
let op_type = Arc::new(UInt8Array::from(vec![0, 0, 0]));
let new_batch = plain_batch
.with_new_columns(vec![col1, col2, sequence, op_type])
.unwrap();
assert_eq!(new_batch.num_columns(), 4);
assert_eq!(new_batch.num_rows(), 3);
assert_eq!(
new_batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(0),
10
);
assert_eq!(
new_batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0),
"x"
);
}
#[test]
fn test_filter() {
let record_batch = create_test_record_batch();
let plain_batch = PlainBatch::new(record_batch);
// Create a predicate that selects the first and third rows
let predicate = BooleanArray::from(vec![true, false, true]);
let filtered_batch = plain_batch.filter(&predicate).unwrap();
assert_eq!(filtered_batch.num_rows(), 2);
assert_eq!(
filtered_batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(0),
1
);
assert_eq!(
filtered_batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(1),
3
);
}
}

View File

@@ -60,7 +60,7 @@ use crate::read::seq_scan::SeqScan;
use crate::read::series_scan::SeriesScan;
use crate::read::stream::ScanBatchStream;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
use crate::read::{BoxedRecordBatchStream, RecordBatch};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
@@ -1031,39 +1031,6 @@ impl ScanInput {
self
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
#[tracing::instrument(
skip(self, sources, semaphore),
fields(
region_id = %self.region_metadata().region_id,
source_count = sources.len()
)
)]
pub(crate) fn create_parallel_sources(
&self,
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
channel_size: usize,
) -> Result<Vec<Source>> {
if sources.len() <= 1 {
return Ok(sources);
}
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
})
.collect();
Ok(sources)
}
/// Builds memtable ranges to scan by `index`.
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
let memtable = &self.memtables[index.index];
@@ -1173,49 +1140,6 @@ impl ScanInput {
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
}
/// Scans the input source in another task and sends batches to the sender.
#[tracing::instrument(
skip(self, input, semaphore, sender),
fields(region_id = %self.region_metadata().region_id)
)]
pub(crate) fn spawn_scan_task(
&self,
mut input: Source,
semaphore: Arc<Semaphore>,
sender: mpsc::Sender<Result<Batch>>,
) {
let region_id = self.region_metadata().region_id;
let span = tracing::info_span!(
"ScanInput::parallel_scan_task",
region_id = %region_id,
stream_kind = "batch"
);
common_runtime::spawn_global(
async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
}
.instrument(span),
);
}
/// Scans flat sources (RecordBatch streams) in parallel.
///
/// # Panics if the input doesn't allow parallel scan.

View File

@@ -39,7 +39,7 @@ use crate::metrics::{
READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
};
use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::read::flat_merge::{MergeMetrics, MergeMetricsReport};
use crate::read::pruner::PartitionPruner;
use crate::read::range::{RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;

View File

@@ -218,34 +218,6 @@ pub(crate) fn internal_fields() -> [FieldRef; 3] {
]
}
/// Gets the arrow schema to store in parquet.
pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(
metadata
.schema
.arrow_schema()
.fields()
.iter()
.cloned()
.chain(plain_internal_fields()),
);
Arc::new(Schema::new(fields))
}
/// Fields for internal columns.
fn plain_internal_fields() -> [FieldRef; 2] {
// Internal columns are always not null.
[
Arc::new(Field::new(
SEQUENCE_COLUMN_NAME,
ArrowDataType::UInt64,
false,
)),
Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
]
}
/// Gets the estimated number of series from record batches.
///
/// This struct tracks the last timestamp value to detect series boundaries

View File

@@ -36,10 +36,10 @@ use store_api::metric_engine_consts::{
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{FileId, RegionId};
use crate::read::{Batch, FlatSource, Source};
use crate::read::{Batch, FlatSource};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger};
use crate::test_util::{new_batch_builder, new_noop_file_purger};
/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
@@ -190,12 +190,6 @@ pub fn new_sparse_primary_key(
buffer
}
/// Creates a [Source] from `batches`.
pub fn new_source(batches: &[Batch]) -> Source {
let reader = VecBatchReader::new(batches);
Source::Reader(Box::new(reader))
}
/// Creates a SST file handle with provided file id
pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle {
let file_purger = new_noop_file_purger();