diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index c33c486e9f..66d121aa47 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -24,6 +24,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; +use crate::memtable::bulk::context::BulkIterContext; use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; @@ -31,7 +32,8 @@ use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; -use crate::read::Batch; +use crate::read::dedup::{LastNonNull, LastRow}; +use crate::read::sync::dedup::DedupReader; use crate::region::options::MergeMode; #[allow(unused)] @@ -123,16 +125,6 @@ impl BulkMemtable { } } -struct EmptyIter; - -impl Iterator for EmptyIter { - type Item = Result; - - fn next(&mut self) -> Option { - None - } -} - impl Memtable for BulkMemtable { fn id(&self) -> MemtableId { self.id @@ -170,13 +162,33 @@ impl Memtable for BulkMemtable { fn iter( &self, - _projection: Option<&[ColumnId]>, - _predicate: Option, - _sequence: Option, + projection: Option<&[ColumnId]>, + predicate: Option, + sequence: Option, ) -> Result { - //todo(hl): temporarily disable reads. - //todo(hl): we should also consider dedup and merge mode when reading bulk parts, - Ok(Box::new(EmptyIter)) + let mut readers = Vec::new(); + let parts = self.parts.read().unwrap(); + + let ctx = Arc::new(BulkIterContext::new( + self.region_metadata.clone(), + &projection, + predicate.clone(), + )); + for part in parts.as_slice() { + if let Some(reader) = part.read(ctx.clone(), sequence).unwrap() { + readers.push(reader); + } + } + let merge_reader = crate::read::sync::merge::MergeReader::new(readers)?; + let reader = match self.merge_mode { + MergeMode::LastRow => { + Box::new(DedupReader::new(merge_reader, LastRow::new(self.dedup))) as BoxedBatchIterator + } + MergeMode::LastNonNull => { + Box::new(DedupReader::new(merge_reader, LastNonNull::new(self.dedup))) as BoxedBatchIterator + } + }; + Ok(reader ) } fn ranges( @@ -240,3 +252,128 @@ impl Memtable for BulkMemtable { )) } } + +#[cfg(test)] +mod tests { + use api::helper::ColumnDataTypeWrapper; + use api::v1::value::ValueData; + use api::v1::{OpType, Row, Rows, SemanticType}; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use std::sync::Arc; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; + use store_api::storage::RegionId; + + use crate::memtable::bulk::part::BulkPartEncoder; + use crate::memtable::bulk::BulkMemtable; + use crate::memtable::{BulkPart, Memtable}; + use crate::region::options::MergeMode; + + fn metrics_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k0", ConcreteDataType::binary_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .primary_key(vec![0]); + let region_metadata = builder.build().unwrap(); + Arc::new(region_metadata) + } + + fn metrics_column_schema() -> Vec { + let schema = metrics_region_metadata(); + schema + .column_metadatas + .iter() + .map(|c| api::v1::ColumnSchema { + column_name: c.column_schema.name.clone(), + datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) + .unwrap() + .datatype() as i32, + semantic_type: c.semantic_type as i32, + ..Default::default() + }) + .collect() + } + + fn build_metrics_bulk_part( + k: &str, + ts: &[i64], + v0: &[Option], + v1: &[Option], + seq: u64, + ) -> BulkPart { + assert_eq!(ts.len(), v0.len()); + assert_eq!(ts.len(), v1.len()); + + let rows = ts + .iter() + .zip(v0.iter()) + .zip(v1.iter()) + .map(|((ts, v0), v1)| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::BinaryValue(k.as_bytes().to_vec())), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(*ts as i64)), + }, + api::v1::Value { + value_data: v0.map(ValueData::F64Value), + }, + api::v1::Value { + value_data: v1.map(ValueData::F64Value), + }, + ], + }) + .collect::>(); + + let mutation = api::v1::Mutation { + op_type: OpType::Put as i32, + sequence: seq, + rows: Some(Rows { + schema: metrics_column_schema(), + rows, + }), + write_hint: None, + bulk: Vec::new(), + }; + let encoder = BulkPartEncoder::new(metrics_region_metadata(), true, 1024); + encoder.encode_mutations(&[mutation]).unwrap().unwrap() + } + + #[test] + fn test_bulk_iter() { + let schema = metrics_region_metadata(); + let memtable = BulkMemtable::new(schema, 0, None, true, MergeMode::LastRow); + memtable.write_bulk(build_metrics_bulk_part("a", &[1], &[None], &[Some(1.0)], 0)).unwrap(); + // write duplicated rows + memtable.write_bulk(build_metrics_bulk_part("a", &[1], &[None], &[Some(1.0)], 0)).unwrap(); + let iter = memtable.iter(None, None, None).unwrap(); + let total_rows = iter.map(|b| { + b.unwrap().num_rows() + }).sum::(); + assert_eq!(1, total_rows); + } +} diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 1a8ee58b6a..8e093cd56e 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -56,7 +56,7 @@ use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BulkPart { pub(crate) data: Bytes, metadata: BulkPartMeta, @@ -95,7 +95,7 @@ impl BulkPart { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BulkPartMeta { /// Total rows in part. pub num_rows: usize, @@ -219,7 +219,6 @@ fn mutations_to_record_batch( for row in key_values.iter() { assert_eq!(1, row.num_primary_keys()); - assert_eq!(1, row.num_fields()); let first_primary_key_col = row.primary_keys().next().unwrap(); let bytes = match first_primary_key_col { @@ -282,7 +281,7 @@ struct ArraysSorter { impl ArraysSorter where - I: Iterator, + I: Iterator, { /// Converts arrays to record batch. fn sort(self) -> Result<(RecordBatch, i64, i64)> { @@ -330,10 +329,10 @@ where check_bounds: false, }), ) - .context(ComputeArrowSnafu)? - .as_any() - .downcast_ref::() - .unwrap(), + .context(ComputeArrowSnafu)? + .as_any() + .downcast_ref::() + .unwrap(), )?) as ArrayRef; let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len()); @@ -346,7 +345,7 @@ where check_bounds: false, }), ) - .context(ComputeArrowSnafu)?, + .context(ComputeArrowSnafu)?, ); } @@ -357,7 +356,7 @@ where check_bounds: false, }), ) - .context(ComputeArrowSnafu)?; + .context(ComputeArrowSnafu)?; arrays.push(timestamp); arrays.push(pk_dictionary); @@ -369,7 +368,7 @@ where check_bounds: false, }), ) - .context(ComputeArrowSnafu)?, + .context(ComputeArrowSnafu)?, ); arrays.push( @@ -380,7 +379,7 @@ where check_bounds: false, }), ) - .context(ComputeArrowSnafu)?, + .context(ComputeArrowSnafu)?, ); let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?; @@ -392,7 +391,7 @@ where fn timestamp_array_to_iter( timestamp_unit: TimeUnit, timestamp: &ArrayRef, -) -> impl Iterator { +) -> impl Iterator { match timestamp_unit { // safety: timestamp column must be valid. TimeUnit::Second => timestamp @@ -454,7 +453,7 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::collections::VecDeque; use datafusion_common::ScalarValue; @@ -520,19 +519,19 @@ mod tests { ); } - struct MutationInput<'a> { - k0: &'a str, - k1: u32, - timestamps: &'a [i64], - v1: &'a [Option], - sequence: u64, + pub(crate) struct MutationInput<'a> { + pub(crate) k0: &'a str, + pub(crate) k1: u32, + pub(crate) timestamps: &'a [i64], + pub(crate) v1: &'a [Option], + pub(crate) sequence: u64, } #[derive(Debug, PartialOrd, PartialEq)] - struct BatchOutput<'a> { - pk_values: &'a [Value], - timestamps: &'a [i64], - v1: &'a [Option], + pub(crate) struct BatchOutput<'a> { + pub(crate) pk_values: &'a [Value], + pub(crate) timestamps: &'a [i64], + pub(crate) v1: &'a [Option], } fn check_mutations_to_record_batches( @@ -553,7 +552,7 @@ mod tests { m.v1.iter().copied(), m.sequence, ) - .mutation + .mutation }) .collect::>(); let total_rows: usize = mutations @@ -761,7 +760,7 @@ mod tests { ); } - fn encode(input: &[MutationInput]) -> BulkPart { + pub(crate) fn encode(input: &[MutationInput]) -> BulkPart { let metadata = metadata_for_test(); let mutations = input .iter() @@ -774,7 +773,7 @@ mod tests { m.v1.iter().copied(), m.sequence, ) - .mutation + .mutation }) .collect::>(); let encoder = BulkPartEncoder::new(metadata, true, 1024); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 80e3a269c5..9f493a4c95 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -912,7 +912,7 @@ impl IterBuilder for TimeSeriesIterBuilder { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::collections::{HashMap, HashSet}; use api::helper::ColumnDataTypeWrapper; @@ -929,7 +929,7 @@ mod tests { use super::*; use crate::row_converter::SortField; - fn schema_for_test() -> RegionMetadataRef { + pub(crate) fn schema_for_test() -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); builder .push_column_metadata(ColumnMetadata { @@ -1143,7 +1143,7 @@ mod tests { ) } - fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues { + pub(crate) fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues { let column_schema = schema .column_metadatas .iter() diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 2f9bdff7b0..1d4484b013 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -26,6 +26,8 @@ pub(crate) mod scan_util; pub(crate) mod seq_scan; pub(crate) mod unordered_scan; +pub(crate) mod sync; + use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 8060c53405..b3da4b1d95 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -284,23 +284,23 @@ impl MergeReaderBuilder { /// Metrics for the merge reader. #[derive(Debug, Default)] -struct Metrics { +pub(crate) struct Metrics { /// Total scan cost of the reader. - scan_cost: Duration, + pub(crate) scan_cost: Duration, /// Number of times to fetch batches. - num_fetch_by_batches: usize, + pub(crate) num_fetch_by_batches: usize, /// Number of times to fetch rows. - num_fetch_by_rows: usize, + pub(crate) num_fetch_by_rows: usize, /// Number of input rows. - num_input_rows: usize, + pub(crate) num_input_rows: usize, /// Number of output rows. - num_output_rows: usize, + pub(crate) num_output_rows: usize, /// Cost to fetch batches from sources. - fetch_cost: Duration, + pub(crate) fetch_cost: Duration, } /// A `Node` represent an individual input data source to be merged. -struct Node { +pub(crate) struct Node { /// Data source of this `Node`. source: Source, /// Current batch to be read. The node ensures the batch is not empty. @@ -313,7 +313,7 @@ impl Node { /// Initialize a node. /// /// It tries to fetch one batch from the `source`. - async fn new(mut source: Source, metrics: &mut Metrics) -> Result { + pub(crate) async fn new(mut source: Source, metrics: &mut Metrics) -> Result { // Ensures batch is not empty. let start = Instant::now(); let current_batch = source.next_batch().await?.map(CompareFirst); @@ -432,7 +432,7 @@ impl Ord for Node { /// Type to compare [Batch] by first row. /// /// It ignores op type as sequence is enough to distinguish different rows. -struct CompareFirst(Batch); +pub(crate) struct CompareFirst(pub(crate) Batch); impl PartialEq for CompareFirst { fn eq(&self, other: &Self) -> bool { diff --git a/src/mito2/src/read/sync.rs b/src/mito2/src/read/sync.rs new file mode 100644 index 0000000000..0fb83f6002 --- /dev/null +++ b/src/mito2/src/read/sync.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod merge; +pub mod dedup; \ No newline at end of file diff --git a/src/mito2/src/read/sync/dedup.rs b/src/mito2/src/read/sync/dedup.rs new file mode 100644 index 0000000000..77d19aa98c --- /dev/null +++ b/src/mito2/src/read/sync/dedup.rs @@ -0,0 +1,84 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sync dedup reader implementation + +use common_telemetry::debug; + +use crate::metrics::MERGE_FILTER_ROWS_TOTAL; +use crate::read::dedup::{DedupMetrics, DedupStrategy}; +use crate::read::Batch; + +/// A sync version of reader that dedup sorted batches from a source based on the +/// dedup strategy. +pub(crate) struct DedupReader { + source: R, + strategy: S, + metrics: DedupMetrics, +} + +impl DedupReader { + /// Creates a new dedup reader. + pub(crate) fn new(source: R, strategy: S) -> Self { + Self { + source, + strategy, + metrics: DedupMetrics::default(), + } + } +} + +impl>, S: DedupStrategy> DedupReader { + /// Returns the next deduplicated batch. + fn fetch_next_batch(&mut self) -> Option> { + while let Some(res) = self.source.next() { + match res { + Ok(batch) => { + if let Some(batch) = self + .strategy + .push_batch(batch, &mut self.metrics) + .transpose() + { + return Some(batch); + } + } + Err(err) => return Some(Err(err)), + } + } + self.strategy.finish(&mut self.metrics).transpose() + } +} + +impl>, S: DedupStrategy> Iterator + for DedupReader +{ + type Item = crate::error::Result; + + fn next(&mut self) -> Option { + self.fetch_next_batch() + } +} + +impl Drop for DedupReader { + fn drop(&mut self) { + debug!("Sync dedup reader finished, metrics: {:?}", self.metrics); + + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["dedup"]) + .inc_by(self.metrics.num_unselected_rows as u64); + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["delete"]) + .inc_by(self.metrics.num_unselected_rows as u64); + } +} diff --git a/src/mito2/src/read/sync/merge.rs b/src/mito2/src/read/sync/merge.rs new file mode 100644 index 0000000000..1a8f5481fd --- /dev/null +++ b/src/mito2/src/read/sync/merge.rs @@ -0,0 +1,384 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sync merge reader implementation. + +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::time::Instant; + +use common_telemetry::debug; + +use crate::error; +use crate::memtable::BoxedBatchIterator; +use crate::metrics::READ_STAGE_ELAPSED; +use crate::read::{Batch}; +use crate::read::merge::{CompareFirst, Metrics}; + +/// A `Node` represent an individual input data source to be merged. +pub(crate) struct Node { + /// Data source of this `Node`. + source: BoxedBatchIterator, + /// Current batch to be read. The node ensures the batch is not empty. + /// + /// `None` means the `source` has reached EOF. + current_batch: Option, +} + +impl Node { + /// Initialize a node. + /// + /// It tries to fetch one batch from the `source`. + pub(crate) fn new( + mut source: BoxedBatchIterator, + metrics: &mut Metrics, + ) -> error::Result { + // Ensures batch is not empty. + let start = Instant::now(); + let current_batch = source.next().transpose()?.map(CompareFirst); + metrics.fetch_cost += start.elapsed(); + metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0); + + Ok(Node { + source, + current_batch, + }) + } + + /// Returns whether the node still has batch to read. + fn is_eof(&self) -> bool { + self.current_batch.is_none() + } + + /// Returns the primary key of current batch. + /// + /// # Panics + /// Panics if the node has reached EOF. + fn primary_key(&self) -> &[u8] { + self.current_batch().primary_key() + } + + /// Returns current batch. + /// + /// # Panics + /// Panics if the node has reached EOF. + fn current_batch(&self) -> &Batch { + &self.current_batch.as_ref().unwrap().0 + } + + /// Returns current batch and fetches next batch + /// from the source. + /// + /// # Panics + /// Panics if the node has reached EOF. + fn fetch_batch(&mut self, metrics: &mut Metrics) -> error::Result { + let current = self.current_batch.take().unwrap(); + let start = Instant::now(); + // Ensures batch is not empty. + self.current_batch = self.source.next().transpose()?.map(CompareFirst); + metrics.fetch_cost += start.elapsed(); + metrics.num_input_rows += self + .current_batch + .as_ref() + .map(|b| b.0.num_rows()) + .unwrap_or(0); + Ok(current.0) + } + + /// Returns true if the key range of current batch in `self` is behind (exclusive) current + /// batch in `other`. + /// + /// # Panics + /// Panics if either `self` or `other` is EOF. + fn is_behind(&self, other: &Node) -> bool { + debug_assert!(!self.current_batch().is_empty()); + debug_assert!(!other.current_batch().is_empty()); + + // We only compare pk and timestamp so nodes in the cold + // heap don't have overlapping timestamps with the hottest node + // in the hot heap. + self.primary_key().cmp(other.primary_key()).then_with(|| { + self.current_batch() + .first_timestamp() + .cmp(&other.current_batch().last_timestamp()) + }) == Ordering::Greater + } + + /// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches + /// next batch from the node. + /// + /// # Panics + /// Panics if the node is EOF. + fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> error::Result<()> { + let batch = self.current_batch(); + debug_assert!(batch.num_rows() >= num_to_skip); + + let remaining = batch.num_rows() - num_to_skip; + if remaining == 0 { + // Nothing remains, we need to fetch next batch to ensure the batch is not empty. + self.fetch_batch(metrics)?; + } else { + debug_assert!(!batch.is_empty()); + self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining))); + } + + Ok(()) + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Node) -> bool { + self.current_batch == other.current_batch + } +} + +impl Eq for Node {} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Node) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Node) -> Ordering { + // The std binary heap is a max heap, but we want the nodes are ordered in + // ascend order, so we compare the nodes in reverse order. + other.current_batch.cmp(&self.current_batch) + } +} + +/// Reader to merge sorted batches. +/// +/// The merge reader merges [Batch]es from multiple sources that yield sorted batches. +/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can +/// ignore op type as sequence is already unique). +/// 2. Batches from sources **must** not be empty. +/// +/// The reader won't concatenate batches. Each batch returned by the reader also doesn't +/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same +/// as the first one in the next batch. +pub struct MergeReader { + /// Holds [Node]s whose key range of current batch **is** overlapped with the merge window. + /// Each node yields batches from a `source`. + /// + /// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp) + /// range of the **root node** in the `hot` heap. + hot: BinaryHeap, + /// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window. + /// + /// `Node` in this heap **must** not be empty. + cold: BinaryHeap, + /// Batch to output. + output_batch: Option, + /// Local metrics. + metrics: Metrics, +} + +impl Drop for MergeReader { + fn drop(&mut self) { + debug!("Merge reader(sync) finished, metrics: {:?}", self.metrics); + + READ_STAGE_ELAPSED + .with_label_values(&["merge"]) + .observe(self.metrics.scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["merge_fetch"]) + .observe(self.metrics.fetch_cost.as_secs_f64()); + } +} + +impl Iterator for MergeReader { + type Item = error::Result; + + fn next(&mut self) -> Option { + let start = Instant::now(); + while !self.hot.is_empty() && self.output_batch.is_none() { + if self.hot.len() == 1 { + // No need to do merge sort if only one batch in the hot heap. + if let Err(e) = self.fetch_batch_from_hottest() { + return Some(Err(e)); + } + self.metrics.num_fetch_by_batches += 1; + } else { + // We could only fetch rows that less than the next node from the hottest node. + if let Err(e) = self.fetch_rows_from_hottest() { + return Some(Err(e)); + } + self.metrics.num_fetch_by_rows += 1; + } + } + + if let Some(batch) = self.output_batch.take() { + self.metrics.scan_cost += start.elapsed(); + self.metrics.num_output_rows += batch.num_rows(); + Some(Ok(batch)) + } else { + // Nothing fetched. + self.metrics.scan_cost += start.elapsed(); + None + } + } +} + +impl MergeReader { + /// Creates and initializes a new [MergeReader]. + pub fn new(sources: Vec) -> error::Result { + let start = Instant::now(); + let mut metrics = Metrics::default(); + + let mut cold = BinaryHeap::with_capacity(sources.len()); + let hot = BinaryHeap::with_capacity(sources.len()); + for source in sources { + let node = Node::new(source, &mut metrics)?; + if !node.is_eof() { + // Ensure `cold` don't have eof nodes. + cold.push(node); + } + } + + let mut reader = MergeReader { + hot, + cold, + output_batch: None, + metrics, + }; + // Initializes the reader. + reader.refill_hot(); + + reader.metrics.scan_cost += start.elapsed(); + Ok(reader) + } + + /// Moves nodes in `cold` heap, whose key range is overlapped with current merge + /// window to `hot` heap. + fn refill_hot(&mut self) { + while !self.cold.is_empty() { + if let Some(merge_window) = self.hot.peek() { + let warmest = self.cold.peek().unwrap(); + if warmest.is_behind(merge_window) { + // if the warmest node in the `cold` heap is totally after the + // `merge_window`, then no need to add more nodes into the `hot` + // heap for merge sorting. + break; + } + } + + let warmest = self.cold.pop().unwrap(); + self.hot.push(warmest); + } + } + + /// Fetches one batch from the hottest node. + fn fetch_batch_from_hottest(&mut self) -> error::Result<()> { + assert_eq!(1, self.hot.len()); + + let mut hottest = self.hot.pop().unwrap(); + let batch = hottest.fetch_batch(&mut self.metrics)?; + Self::maybe_output_batch(batch, &mut self.output_batch)?; + self.reheap(hottest) + } + + /// Fetches non-duplicated rows from the hottest node. + fn fetch_rows_from_hottest(&mut self) -> error::Result<()> { + // Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element. + // Pop hottest node. + let mut top_node = self.hot.pop().unwrap(); + let top = top_node.current_batch(); + // Min timestamp and its sequence in the next batch. + let next_min_ts = { + let next_node = self.hot.peek().unwrap(); + let next = next_node.current_batch(); + // top and next have overlapping rows so they must have same primary keys. + debug_assert_eq!(top.primary_key(), next.primary_key()); + // Safety: Batches in the heap is not empty, so we can use unwrap here. + next.first_timestamp().unwrap() + }; + + // Safety: Batches in the heap is not empty, so we can use unwrap here. + let timestamps = top.timestamps_native().unwrap(); + // Binary searches the timestamp in the top batch. + // Safety: Batches should have the same timestamp resolution so we can compare the native + // value directly. + let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) { + Ok(pos) => pos, + Err(pos) => { + // No duplicate timestamp. Outputs timestamp before `pos`. + Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?; + top_node.skip_rows(pos, &mut self.metrics)?; + return self.reheap(top_node); + } + }; + + // No need to remove duplicate timestamps. + let output_end = if duplicate_pos == 0 { + // If the first timestamp of the top node is duplicate. We can simply return the first row + // as the heap ensure it is the one with largest sequence. + 1 + } else { + // We don't know which one has the larger sequence so we use the range before + // the duplicate pos. + duplicate_pos + }; + Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?; + top_node.skip_rows(output_end, &mut self.metrics)?; + self.reheap(top_node) + } + + /// Push the node popped from `hot` back to a proper heap. + fn reheap(&mut self, node: Node) -> crate::error::Result<()> { + if node.is_eof() { + // If the node is EOF, don't put it into the heap again. + // The merge window would be updated, need to refill the hot heap. + self.refill_hot(); + } else { + // Find a proper heap for this node. + let node_is_cold = if let Some(hottest) = self.hot.peek() { + // If key range of this node is behind the hottest node's then we can + // push it to the cold heap. Otherwise we should push it to the hot heap. + node.is_behind(hottest) + } else { + // The hot heap is empty, but we don't known whether the current + // batch of this node is still the hottest. + true + }; + + if node_is_cold { + self.cold.push(node); + } else { + self.hot.push(node); + } + // Anyway, the merge window has been changed, we need to refill the hot heap. + self.refill_hot(); + } + + Ok(()) + } + + /// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`. + /// + /// Ignores the `batch` if it is empty. + fn maybe_output_batch( + batch: Batch, + output_batch: &mut Option, + ) -> crate::error::Result<()> { + debug_assert!(output_batch.is_none()); + if batch.is_empty() { + return Ok(()); + } + *output_batch = Some(batch); + + Ok(()) + } +}