### Added Deduplication and Merge Functionality

- Introduced `DedupReader` and `MergeReader` in `src/mito2/src/read/sync/dedup.rs` and `src/mito2/src/read/sync/merge.rs` to handle deduplication and merging of sorted batches.
 ### Enhanced `BulkMemtable` Iteration
 - Updated `BulkMemtable` in `src/mito2/src/memtable/bulk.rs` to support deduplication and merge modes during iteration.
 - Added `BulkIterContext` to manage iteration context.
 ### Testing Enhancements
 - Added comprehensive tests for `BulkMemtable` and `BulkPart` in `src/mito2/src/memtable/bulk.rs` and `src/mito2/src/memtable/bulk/part.rs`.
 ### Code Refactoring
 - Made `BulkPart` and `BulkPartMeta` cloneable in `src/mito2/src/memtable/bulk/part.rs`.
 - Exposed internal test modules for better test coverage in `src/mito2/src/memtable/time_series.rs` and `src/mito2/src/read/merge.rs`.
 ### New Modules
 - Created `sync` module in `src/mito2/src/read.rs` to organize synchronous read operations.
This commit is contained in:
Lei, HUANG
2025-02-20 08:54:44 +00:00
parent 91d755d9b5
commit 68593ae92a
8 changed files with 679 additions and 57 deletions

View File

@@ -24,6 +24,7 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::context::BulkIterContext;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
@@ -31,7 +32,8 @@ use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRanges, MemtableRef, MemtableStats,
};
use crate::read::Batch;
use crate::read::dedup::{LastNonNull, LastRow};
use crate::read::sync::dedup::DedupReader;
use crate::region::options::MergeMode;
#[allow(unused)]
@@ -123,16 +125,6 @@ impl BulkMemtable {
}
}
struct EmptyIter;
impl Iterator for EmptyIter {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
impl Memtable for BulkMemtable {
fn id(&self) -> MemtableId {
self.id
@@ -170,13 +162,33 @@ impl Memtable for BulkMemtable {
fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
_sequence: Option<SequenceNumber>,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> {
//todo(hl): temporarily disable reads.
//todo(hl): we should also consider dedup and merge mode when reading bulk parts,
Ok(Box::new(EmptyIter))
let mut readers = Vec::new();
let parts = self.parts.read().unwrap();
let ctx = Arc::new(BulkIterContext::new(
self.region_metadata.clone(),
&projection,
predicate.clone(),
));
for part in parts.as_slice() {
if let Some(reader) = part.read(ctx.clone(), sequence).unwrap() {
readers.push(reader);
}
}
let merge_reader = crate::read::sync::merge::MergeReader::new(readers)?;
let reader = match self.merge_mode {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(self.dedup))) as BoxedBatchIterator
}
MergeMode::LastNonNull => {
Box::new(DedupReader::new(merge_reader, LastNonNull::new(self.dedup))) as BoxedBatchIterator
}
};
Ok(reader )
}
fn ranges(
@@ -240,3 +252,128 @@ impl Memtable for BulkMemtable {
))
}
}
#[cfg(test)]
mod tests {
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{OpType, Row, Rows, SemanticType};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use std::sync::Arc;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::RegionId;
use crate::memtable::bulk::part::BulkPartEncoder;
use crate::memtable::bulk::BulkMemtable;
use crate::memtable::{BulkPart, Memtable};
use crate::region::options::MergeMode;
fn metrics_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::binary_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.primary_key(vec![0]);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
}
fn metrics_column_schema() -> Vec<api::v1::ColumnSchema> {
let schema = metrics_region_metadata();
schema
.column_metadatas
.iter()
.map(|c| api::v1::ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.unwrap()
.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
})
.collect()
}
fn build_metrics_bulk_part(
k: &str,
ts: &[i64],
v0: &[Option<f64>],
v1: &[Option<f64>],
seq: u64,
) -> BulkPart {
assert_eq!(ts.len(), v0.len());
assert_eq!(ts.len(), v1.len());
let rows = ts
.iter()
.zip(v0.iter())
.zip(v1.iter())
.map(|((ts, v0), v1)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::BinaryValue(k.as_bytes().to_vec())),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(*ts as i64)),
},
api::v1::Value {
value_data: v0.map(ValueData::F64Value),
},
api::v1::Value {
value_data: v1.map(ValueData::F64Value),
},
],
})
.collect::<Vec<_>>();
let mutation = api::v1::Mutation {
op_type: OpType::Put as i32,
sequence: seq,
rows: Some(Rows {
schema: metrics_column_schema(),
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
let encoder = BulkPartEncoder::new(metrics_region_metadata(), true, 1024);
encoder.encode_mutations(&[mutation]).unwrap().unwrap()
}
#[test]
fn test_bulk_iter() {
let schema = metrics_region_metadata();
let memtable = BulkMemtable::new(schema, 0, None, true, MergeMode::LastRow);
memtable.write_bulk(build_metrics_bulk_part("a", &[1], &[None], &[Some(1.0)], 0)).unwrap();
// write duplicated rows
memtable.write_bulk(build_metrics_bulk_part("a", &[1], &[None], &[Some(1.0)], 0)).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
let total_rows = iter.map(|b| {
b.unwrap().num_rows()
}).sum::<usize>();
assert_eq!(1, total_rows);
}
}

View File

@@ -56,7 +56,7 @@ use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BulkPart {
pub(crate) data: Bytes,
metadata: BulkPartMeta,
@@ -95,7 +95,7 @@ impl BulkPart {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BulkPartMeta {
/// Total rows in part.
pub num_rows: usize,
@@ -219,7 +219,6 @@ fn mutations_to_record_batch(
for row in key_values.iter() {
assert_eq!(1, row.num_primary_keys());
assert_eq!(1, row.num_fields());
let first_primary_key_col = row.primary_keys().next().unwrap();
let bytes = match first_primary_key_col {
@@ -282,7 +281,7 @@ struct ArraysSorter<I> {
impl<I> ArraysSorter<I>
where
I: Iterator<Item = ArrayRef>,
I: Iterator<Item=ArrayRef>,
{
/// Converts arrays to record batch.
fn sort(self) -> Result<(RecordBatch, i64, i64)> {
@@ -330,10 +329,10 @@ where
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap(),
.context(ComputeArrowSnafu)?
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap(),
)?) as ArrayRef;
let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
@@ -346,7 +345,7 @@ where
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
.context(ComputeArrowSnafu)?,
);
}
@@ -357,7 +356,7 @@ where
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?;
.context(ComputeArrowSnafu)?;
arrays.push(timestamp);
arrays.push(pk_dictionary);
@@ -369,7 +368,7 @@ where
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
.context(ComputeArrowSnafu)?,
);
arrays.push(
@@ -380,7 +379,7 @@ where
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
.context(ComputeArrowSnafu)?,
);
let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
@@ -392,7 +391,7 @@ where
fn timestamp_array_to_iter(
timestamp_unit: TimeUnit,
timestamp: &ArrayRef,
) -> impl Iterator<Item = &i64> {
) -> impl Iterator<Item=&i64> {
match timestamp_unit {
// safety: timestamp column must be valid.
TimeUnit::Second => timestamp
@@ -454,7 +453,7 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::collections::VecDeque;
use datafusion_common::ScalarValue;
@@ -520,19 +519,19 @@ mod tests {
);
}
struct MutationInput<'a> {
k0: &'a str,
k1: u32,
timestamps: &'a [i64],
v1: &'a [Option<f64>],
sequence: u64,
pub(crate) struct MutationInput<'a> {
pub(crate) k0: &'a str,
pub(crate) k1: u32,
pub(crate) timestamps: &'a [i64],
pub(crate) v1: &'a [Option<f64>],
pub(crate) sequence: u64,
}
#[derive(Debug, PartialOrd, PartialEq)]
struct BatchOutput<'a> {
pk_values: &'a [Value],
timestamps: &'a [i64],
v1: &'a [Option<f64>],
pub(crate) struct BatchOutput<'a> {
pub(crate) pk_values: &'a [Value],
pub(crate) timestamps: &'a [i64],
pub(crate) v1: &'a [Option<f64>],
}
fn check_mutations_to_record_batches(
@@ -553,7 +552,7 @@ mod tests {
m.v1.iter().copied(),
m.sequence,
)
.mutation
.mutation
})
.collect::<Vec<_>>();
let total_rows: usize = mutations
@@ -761,7 +760,7 @@ mod tests {
);
}
fn encode(input: &[MutationInput]) -> BulkPart {
pub(crate) fn encode(input: &[MutationInput]) -> BulkPart {
let metadata = metadata_for_test();
let mutations = input
.iter()
@@ -774,7 +773,7 @@ mod tests {
m.v1.iter().copied(),
m.sequence,
)
.mutation
.mutation
})
.collect::<Vec<_>>();
let encoder = BulkPartEncoder::new(metadata, true, 1024);

View File

@@ -912,7 +912,7 @@ impl IterBuilder for TimeSeriesIterBuilder {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper;
@@ -929,7 +929,7 @@ mod tests {
use super::*;
use crate::row_converter::SortField;
fn schema_for_test() -> RegionMetadataRef {
pub(crate) fn schema_for_test() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
@@ -1143,7 +1143,7 @@ mod tests {
)
}
fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
pub(crate) fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
let column_schema = schema
.column_metadatas
.iter()

View File

@@ -26,6 +26,8 @@ pub(crate) mod scan_util;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
pub(crate) mod sync;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -284,23 +284,23 @@ impl MergeReaderBuilder {
/// Metrics for the merge reader.
#[derive(Debug, Default)]
struct Metrics {
pub(crate) struct Metrics {
/// Total scan cost of the reader.
scan_cost: Duration,
pub(crate) scan_cost: Duration,
/// Number of times to fetch batches.
num_fetch_by_batches: usize,
pub(crate) num_fetch_by_batches: usize,
/// Number of times to fetch rows.
num_fetch_by_rows: usize,
pub(crate) num_fetch_by_rows: usize,
/// Number of input rows.
num_input_rows: usize,
pub(crate) num_input_rows: usize,
/// Number of output rows.
num_output_rows: usize,
pub(crate) num_output_rows: usize,
/// Cost to fetch batches from sources.
fetch_cost: Duration,
pub(crate) fetch_cost: Duration,
}
/// A `Node` represent an individual input data source to be merged.
struct Node {
pub(crate) struct Node {
/// Data source of this `Node`.
source: Source,
/// Current batch to be read. The node ensures the batch is not empty.
@@ -313,7 +313,7 @@ impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
pub(crate) async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst);
@@ -432,7 +432,7 @@ impl Ord for Node {
/// Type to compare [Batch] by first row.
///
/// It ignores op type as sequence is enough to distinguish different rows.
struct CompareFirst(Batch);
pub(crate) struct CompareFirst(pub(crate) Batch);
impl PartialEq for CompareFirst {
fn eq(&self, other: &Self) -> bool {

View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod merge;
pub mod dedup;

View File

@@ -0,0 +1,84 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sync dedup reader implementation
use common_telemetry::debug;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::dedup::{DedupMetrics, DedupStrategy};
use crate::read::Batch;
/// A sync version of reader that dedup sorted batches from a source based on the
/// dedup strategy.
pub(crate) struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
}
impl<R, S> DedupReader<R, S> {
/// Creates a new dedup reader.
pub(crate) fn new(source: R, strategy: S) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
}
}
}
impl<R: Iterator<Item = crate::error::Result<Batch>>, S: DedupStrategy> DedupReader<R, S> {
/// Returns the next deduplicated batch.
fn fetch_next_batch(&mut self) -> Option<crate::error::Result<Batch>> {
while let Some(res) = self.source.next() {
match res {
Ok(batch) => {
if let Some(batch) = self
.strategy
.push_batch(batch, &mut self.metrics)
.transpose()
{
return Some(batch);
}
}
Err(err) => return Some(Err(err)),
}
}
self.strategy.finish(&mut self.metrics).transpose()
}
}
impl<R: Iterator<Item = crate::error::Result<Batch>>, S: DedupStrategy> Iterator
for DedupReader<R, S>
{
type Item = crate::error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.fetch_next_batch()
}
}
impl<R, S> Drop for DedupReader<R, S> {
fn drop(&mut self) {
debug!("Sync dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
}
}

View File

@@ -0,0 +1,384 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sync merge reader implementation.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::Instant;
use common_telemetry::debug;
use crate::error;
use crate::memtable::BoxedBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::{Batch};
use crate::read::merge::{CompareFirst, Metrics};
/// A `Node` represent an individual input data source to be merged.
pub(crate) struct Node {
/// Data source of this `Node`.
source: BoxedBatchIterator,
/// Current batch to be read. The node ensures the batch is not empty.
///
/// `None` means the `source` has reached EOF.
current_batch: Option<CompareFirst>,
}
impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
pub(crate) fn new(
mut source: BoxedBatchIterator,
metrics: &mut Metrics,
) -> error::Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next().transpose()?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
Ok(Node {
source,
current_batch,
})
}
/// Returns whether the node still has batch to read.
fn is_eof(&self) -> bool {
self.current_batch.is_none()
}
/// Returns the primary key of current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn primary_key(&self) -> &[u8] {
self.current_batch().primary_key()
}
/// Returns current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn current_batch(&self) -> &Batch {
&self.current_batch.as_ref().unwrap().0
}
/// Returns current batch and fetches next batch
/// from the source.
///
/// # Panics
/// Panics if the node has reached EOF.
fn fetch_batch(&mut self, metrics: &mut Metrics) -> error::Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next().transpose()?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += self
.current_batch
.as_ref()
.map(|b| b.0.num_rows())
.unwrap_or(0);
Ok(current.0)
}
/// Returns true if the key range of current batch in `self` is behind (exclusive) current
/// batch in `other`.
///
/// # Panics
/// Panics if either `self` or `other` is EOF.
fn is_behind(&self, other: &Node) -> bool {
debug_assert!(!self.current_batch().is_empty());
debug_assert!(!other.current_batch().is_empty());
// We only compare pk and timestamp so nodes in the cold
// heap don't have overlapping timestamps with the hottest node
// in the hot heap.
self.primary_key().cmp(other.primary_key()).then_with(|| {
self.current_batch()
.first_timestamp()
.cmp(&other.current_batch().last_timestamp())
}) == Ordering::Greater
}
/// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches
/// next batch from the node.
///
/// # Panics
/// Panics if the node is EOF.
fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> error::Result<()> {
let batch = self.current_batch();
debug_assert!(batch.num_rows() >= num_to_skip);
let remaining = batch.num_rows() - num_to_skip;
if remaining == 0 {
// Nothing remains, we need to fetch next batch to ensure the batch is not empty.
self.fetch_batch(metrics)?;
} else {
debug_assert!(!batch.is_empty());
self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
}
Ok(())
}
}
impl PartialEq for Node {
fn eq(&self, other: &Node) -> bool {
self.current_batch == other.current_batch
}
}
impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Node {
fn cmp(&self, other: &Node) -> Ordering {
// The std binary heap is a max heap, but we want the nodes are ordered in
// ascend order, so we compare the nodes in reverse order.
other.current_batch.cmp(&self.current_batch)
}
}
/// Reader to merge sorted batches.
///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batches from sources **must** not be empty.
///
/// The reader won't concatenate batches. Each batch returned by the reader also doesn't
/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same
/// as the first one in the next batch.
pub struct MergeReader {
/// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
/// Each node yields batches from a `source`.
///
/// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp)
/// range of the **root node** in the `hot` heap.
hot: BinaryHeap<Node>,
/// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window.
///
/// `Node` in this heap **must** not be empty.
cold: BinaryHeap<Node>,
/// Batch to output.
output_batch: Option<Batch>,
/// Local metrics.
metrics: Metrics,
}
impl Drop for MergeReader {
fn drop(&mut self) {
debug!("Merge reader(sync) finished, metrics: {:?}", self.metrics);
READ_STAGE_ELAPSED
.with_label_values(&["merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
}
}
impl Iterator for MergeReader {
type Item = error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
let start = Instant::now();
while !self.hot.is_empty() && self.output_batch.is_none() {
if self.hot.len() == 1 {
// No need to do merge sort if only one batch in the hot heap.
if let Err(e) = self.fetch_batch_from_hottest() {
return Some(Err(e));
}
self.metrics.num_fetch_by_batches += 1;
} else {
// We could only fetch rows that less than the next node from the hottest node.
if let Err(e) = self.fetch_rows_from_hottest() {
return Some(Err(e));
}
self.metrics.num_fetch_by_rows += 1;
}
}
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.num_output_rows += batch.num_rows();
Some(Ok(batch))
} else {
// Nothing fetched.
self.metrics.scan_cost += start.elapsed();
None
}
}
}
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub fn new(sources: Vec<BoxedBatchIterator>) -> error::Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
let mut cold = BinaryHeap::with_capacity(sources.len());
let hot = BinaryHeap::with_capacity(sources.len());
for source in sources {
let node = Node::new(source, &mut metrics)?;
if !node.is_eof() {
// Ensure `cold` don't have eof nodes.
cold.push(node);
}
}
let mut reader = MergeReader {
hot,
cold,
output_batch: None,
metrics,
};
// Initializes the reader.
reader.refill_hot();
reader.metrics.scan_cost += start.elapsed();
Ok(reader)
}
/// Moves nodes in `cold` heap, whose key range is overlapped with current merge
/// window to `hot` heap.
fn refill_hot(&mut self) {
while !self.cold.is_empty() {
if let Some(merge_window) = self.hot.peek() {
let warmest = self.cold.peek().unwrap();
if warmest.is_behind(merge_window) {
// if the warmest node in the `cold` heap is totally after the
// `merge_window`, then no need to add more nodes into the `hot`
// heap for merge sorting.
break;
}
}
let warmest = self.cold.pop().unwrap();
self.hot.push(warmest);
}
}
/// Fetches one batch from the hottest node.
fn fetch_batch_from_hottest(&mut self) -> error::Result<()> {
assert_eq!(1, self.hot.len());
let mut hottest = self.hot.pop().unwrap();
let batch = hottest.fetch_batch(&mut self.metrics)?;
Self::maybe_output_batch(batch, &mut self.output_batch)?;
self.reheap(hottest)
}
/// Fetches non-duplicated rows from the hottest node.
fn fetch_rows_from_hottest(&mut self) -> error::Result<()> {
// Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
// Pop hottest node.
let mut top_node = self.hot.pop().unwrap();
let top = top_node.current_batch();
// Min timestamp and its sequence in the next batch.
let next_min_ts = {
let next_node = self.hot.peek().unwrap();
let next = next_node.current_batch();
// top and next have overlapping rows so they must have same primary keys.
debug_assert_eq!(top.primary_key(), next.primary_key());
// Safety: Batches in the heap is not empty, so we can use unwrap here.
next.first_timestamp().unwrap()
};
// Safety: Batches in the heap is not empty, so we can use unwrap here.
let timestamps = top.timestamps_native().unwrap();
// Binary searches the timestamp in the top batch.
// Safety: Batches should have the same timestamp resolution so we can compare the native
// value directly.
let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
Ok(pos) => pos,
Err(pos) => {
// No duplicate timestamp. Outputs timestamp before `pos`.
Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
top_node.skip_rows(pos, &mut self.metrics)?;
return self.reheap(top_node);
}
};
// No need to remove duplicate timestamps.
let output_end = if duplicate_pos == 0 {
// If the first timestamp of the top node is duplicate. We can simply return the first row
// as the heap ensure it is the one with largest sequence.
1
} else {
// We don't know which one has the larger sequence so we use the range before
// the duplicate pos.
duplicate_pos
};
Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
top_node.skip_rows(output_end, &mut self.metrics)?;
self.reheap(top_node)
}
/// Push the node popped from `hot` back to a proper heap.
fn reheap(&mut self, node: Node) -> crate::error::Result<()> {
if node.is_eof() {
// If the node is EOF, don't put it into the heap again.
// The merge window would be updated, need to refill the hot heap.
self.refill_hot();
} else {
// Find a proper heap for this node.
let node_is_cold = if let Some(hottest) = self.hot.peek() {
// If key range of this node is behind the hottest node's then we can
// push it to the cold heap. Otherwise we should push it to the hot heap.
node.is_behind(hottest)
} else {
// The hot heap is empty, but we don't known whether the current
// batch of this node is still the hottest.
true
};
if node_is_cold {
self.cold.push(node);
} else {
self.hot.push(node);
}
// Anyway, the merge window has been changed, we need to refill the hot heap.
self.refill_hot();
}
Ok(())
}
/// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
///
/// Ignores the `batch` if it is empty.
fn maybe_output_batch(
batch: Batch,
output_batch: &mut Option<Batch>,
) -> crate::error::Result<()> {
debug_assert!(output_batch.is_none());
if batch.is_empty() {
return Ok(());
}
*output_batch = Some(batch);
Ok(())
}
}