feat(mito): merge reader for mito2 (#2210)

* feat: Implement slice and first/last timestamp for Batch

* feat(mito): implements sort/concat for Batch

* chore: fix typo

* chore: remove comments

* feat: sort and dedup

* test: test batch operations

* chore: cast enum to test op type

* test: test filter related api

* sytle: fix clippy

* feat: implement Node and CompareFirst

* feat: merge reader wip

* feat: merge wip

* feat: use batch's operation to sort and dedup

* feat: implement BatchReader for MergeReader

* feat: simplify codes

* test: test merge reader

* refactor: use test util to create batch

* refactor: remove unused imports

* feat: update comment

* chore: remove metadata() from Source

* chroe: update comment

* feat: source supports batch iterator

* chore: update comment
This commit is contained in:
Yingwen
2023-08-24 11:37:51 +08:00
committed by GitHub
parent e5ba3d1708
commit 4ee1034012
5 changed files with 596 additions and 50 deletions

View File

@@ -35,7 +35,7 @@ use crate::read::Batch;
/// Should be unique under the same region.
pub type MemtableId = u32;
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>>>;
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send + Sync>;
/// In memory write buffer.
pub trait Memtable: Send + Sync + fmt::Debug {

View File

@@ -14,6 +14,8 @@
//! Common structs and utilities for reading data.
pub mod merge;
use std::sync::Arc;
use api::v1::OpType;
@@ -29,12 +31,12 @@ use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
/// Storage internal representation of a batch of rows
/// for a primary key (time series).
@@ -109,7 +111,7 @@ impl Batch {
self.num_rows() == 0
}
/// Returns the first timestamp in the batch.
/// Returns the first timestamp in the batch or `None` if the batch is empty.
pub fn first_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
@@ -118,7 +120,7 @@ impl Batch {
Some(self.get_timestamp(0))
}
/// Returns the last timestamp in the batch.
/// Returns the last timestamp in the batch or `None` if the batch is empty.
pub fn last_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
@@ -554,20 +556,23 @@ pub struct SourceStats {
/// Async [Batch] reader and iterator wrapper.
///
/// This is the data source for SST writers or internal readers.
pub enum Source {}
pub enum Source {
/// Source from a [BoxedBatchReader].
Reader(BoxedBatchReader),
/// Source from a [BoxedBatchIterator].
Iter(BoxedBatchIterator),
}
impl Source {
/// Returns next [Batch] from this data source.
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
unimplemented!()
match self {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
}
}
/// Returns the metadata of the source region.
pub(crate) fn metadata(&self) -> RegionMetadataRef {
unimplemented!()
}
// TODO(yingwen): Maybe remove this method.
// TODO(yingwen): Remove this method once we support collecting stats in the writer.
/// Returns statisics of fetched batches.
pub(crate) fn stats(&self) -> SourceStats {
unimplemented!()
@@ -603,38 +608,9 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
#[cfg(test)]
mod tests {
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use super::*;
use crate::error::Error;
fn new_batch_builder(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> BatchBuilder {
let mut builder = BatchBuilder::new(b"test".to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
1,
Arc::new(UInt64Array::from_iter_values(field.iter().copied())),
)
.unwrap();
builder
}
use crate::test_util::new_batch_builder;
fn new_batch(
timestamps: &[i64],
@@ -642,7 +618,7 @@ mod tests {
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(timestamps, sequences, op_types, field)
new_batch_builder(b"test", timestamps, sequences, op_types, field)
.build()
.unwrap()
}

492
src/mito2/src/read/merge.rs Normal file
View File

@@ -0,0 +1,492 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Merge reader implementation.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::mem;
use async_trait::async_trait;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// Reader to merge sorted batches.
///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index).
pub struct MergeReader {
/// Holds a min-heap for all [Node]s. Each node yields batches from a `source`.
///
/// `Node` in this heap **must** not be EOF.
nodes: BinaryHeap<Node>,
/// Batches for the next primary key.
batch_merger: BatchMerger,
}
#[async_trait]
impl BatchReader for MergeReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
// Collect batches from sources for the same primary key and return
// the collected batch.
while !self.nodes.is_empty() {
// Peek current key.
let Some(current_key) = self.batch_merger.primary_key() else {
// The merger is empty, we could push it directly.
self.take_batch_from_heap().await?;
// Try next node.
continue;
};
// If next node has a different key, we have finish collecting current key.
// Safety: node is not empty.
if self.nodes.peek().unwrap().primary_key() != current_key {
break;
}
// They have the same primary key, we could take it and try next node.
self.take_batch_from_heap().await?;
}
// Merge collected batches.
self.batch_merger.merge_batches()
}
}
impl MergeReader {
/// Creates a new [MergeReader].
pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
let mut nodes = BinaryHeap::with_capacity(sources.len());
for source in sources {
let node = Node::new(source).await?;
if !node.is_eof() {
// Ensure `nodes` don't have eof node.
nodes.push(node);
}
}
Ok(MergeReader {
nodes,
batch_merger: BatchMerger::new(),
})
}
/// Takes batch from heap top and reheap.
async fn take_batch_from_heap(&mut self) -> Result<()> {
let mut next_node = self.nodes.pop().unwrap();
let batch = next_node.fetch_batch().await?;
self.batch_merger.push(batch);
// Insert the node back to the heap.
// If the node reaches EOF, ignores it. This ensures nodes in the heap is always not EOF.
if next_node.is_eof() {
return Ok(());
}
self.nodes.push(next_node);
Ok(())
}
}
/// Builder to build and initialize a [MergeReader].
#[derive(Default)]
pub struct MergeReaderBuilder {
/// Input sources.
///
/// All source must yield batches with the same schema.
sources: Vec<Source>,
}
impl MergeReaderBuilder {
/// Returns an empty builder.
pub fn new() -> MergeReaderBuilder {
MergeReaderBuilder::default()
}
/// Pushs a batch reader to sources.
pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
self.sources.push(Source::Reader(reader));
self
}
/// Push a batch iterator to sources.
pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self {
self.sources.push(Source::Iter(iter));
self
}
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources).await
}
}
/// Helper to merge batches for same primary key.
struct BatchMerger {
/// Buffered non-empty batches to merge.
batches: Vec<Batch>,
/// Whether the batch buffer is still sorted.
is_sorted: bool,
}
impl BatchMerger {
/// Returns a empty merger.
fn new() -> BatchMerger {
BatchMerger {
batches: Vec::new(),
is_sorted: true, // An empty merger is always sorted.
}
}
/// Returns the primary key of current merger and `None` if the merger is empty.
fn primary_key(&self) -> Option<&[u8]> {
self.batches.first().map(|batch| batch.primary_key())
}
/// Push a `batch` into the merger.
///
/// Ignore the `batch` if it is empty.
///
/// # Panics
/// Panics if the `batch` has another primary key.
fn push(&mut self, batch: Batch) {
if batch.is_empty() {
return;
}
if self.batches.is_empty() || !self.is_sorted {
// Merger is empty or is not sorted, we can push the batch directly.
self.batches.push(batch);
return;
}
// Merger is sorted, checks whether we can still preserve sorted state.
let last_batch = self.batches.last().unwrap();
assert_eq!(last_batch.primary_key(), batch.primary_key());
match last_batch.last_timestamp().cmp(&batch.first_timestamp()) {
Ordering::Less => {
// Still sorted.
self.batches.push(batch);
return;
}
Ordering::Equal => {
// Check sequence.
if last_batch.last_sequence() > batch.first_sequence() {
// Still sorted.
self.batches.push(batch);
return;
}
}
Ordering::Greater => (),
}
// Merger is no longer sorted.
self.batches.push(batch);
self.is_sorted = false;
}
/// Merge all buffered batches and returns the merged batch. Then
/// reset the buffer.
fn merge_batches(&mut self) -> Result<Option<Batch>> {
if self.batches.is_empty() {
return Ok(None);
}
let batches = mem::take(&mut self.batches);
// Concat all batches.
let mut batch = Batch::concat(batches)?;
// TODO(yingwen): metrics for sorted and unsorted batches.
if !self.is_sorted {
// Slow path. We need to merge overlapping batches. For simplicity, we
// just sort the all batches and remove duplications.
batch.sort_and_dedup()?;
// We don't need to remove duplications if timestamps of batches
// are not overlapping.
}
// Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter
// rows by sequence for simplicity and performance reason.
batch.filter_deleted()?;
Ok(Some(batch))
}
}
/// A `Node` represent an individual input data source to be merged.
struct Node {
/// Data source of this `Node`.
source: Source,
/// Current batch to be read.
///
/// `None` means the `source` has reached EOF.
current_batch: Option<CompareFirst>,
}
impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source) -> Result<Node> {
let current_batch = source.next_batch().await?.map(CompareFirst);
Ok(Node {
source,
current_batch,
})
}
/// Returns whether the node still has batch to read.
fn is_eof(&self) -> bool {
self.current_batch.is_none()
}
/// Returns the primary key of current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn primary_key(&self) -> &[u8] {
self.current_batch().primary_key()
}
/// Returns current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn current_batch(&self) -> &Batch {
&self.current_batch.as_ref().unwrap().0
}
/// Returns current batch and fetches next batch
/// from the source.
///
/// # Panics
/// Panics if the node has reached EOF.
async fn fetch_batch(&mut self) -> Result<Batch> {
let current = self.current_batch.take().unwrap();
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
Ok(current.0)
}
}
impl PartialEq for Node {
fn eq(&self, other: &Node) -> bool {
self.current_batch == other.current_batch
}
}
impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Node {
fn cmp(&self, other: &Node) -> Ordering {
// The std binary heap is a max heap, but we want the nodes are ordered in
// ascend order, so we compare the nodes in reverse order.
other.current_batch.cmp(&self.current_batch)
}
}
/// Type to compare [Batch] by first row.
///
/// It ignores op type as sequence is enough to distinguish different rows.
struct CompareFirst(Batch);
impl PartialEq for CompareFirst {
fn eq(&self, other: &Self) -> bool {
self.0.primary_key() == other.0.primary_key()
&& self.0.first_timestamp() == other.0.first_timestamp()
&& self.0.first_sequence() == other.0.first_sequence()
}
}
impl Eq for CompareFirst {}
impl PartialOrd for CompareFirst {
fn partial_cmp(&self, other: &CompareFirst) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CompareFirst {
/// Compares by primary key, time index, sequence desc.
fn cmp(&self, other: &CompareFirst) -> Ordering {
self.0
.primary_key()
.cmp(other.0.primary_key())
.then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp()))
.then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence()))
}
}
#[cfg(test)]
mod tests {
use api::v1::OpType;
use super::*;
use crate::test_util::{new_batch, VecBatchReader};
#[tokio::test]
async fn test_merge_reader_empty() {
let mut reader = MergeReaderBuilder::new().build().await.unwrap();
assert!(reader.next_batch().await.unwrap().is_none());
assert!(reader.next_batch().await.unwrap().is_none());
}
async fn check_merge_result(reader: &mut MergeReader, expect: &[Batch]) {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
result.push(batch);
}
assert_eq!(expect, result);
}
#[tokio::test]
async fn test_merge_non_overlapping() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[7, 8],
&[17, 18],
&[OpType::Put, OpType::Delete],
&[27, 28],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
),
]);
let reader2 = VecBatchReader::new(&[new_batch(
b"k1",
&[4, 5],
&[14, 15],
&[OpType::Put, OpType::Put],
&[24, 25],
)]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_merge_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2, 4, 5, 7],
&[11, 12, 14, 15, 17],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[21, 22, 24, 25, 27],
),
new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]),
],
)
.await;
}
#[tokio::test]
async fn test_merge_overlapping() {
let reader1 = VecBatchReader::new(&[
new_batch(
b"k1",
&[1, 2],
&[11, 12],
&[OpType::Put, OpType::Put],
&[21, 22],
),
new_batch(
b"k1",
&[4, 5],
&[14, 15],
// This override 4 and deletes 5.
&[OpType::Put, OpType::Delete],
&[24, 25],
),
new_batch(
b"k2",
&[2, 3],
&[12, 13],
// This delete 2.
&[OpType::Delete, OpType::Put],
&[22, 23],
),
]);
let reader2 = VecBatchReader::new(&[
new_batch(
b"k1",
&[3, 4, 5],
&[10, 10, 10],
&[OpType::Put, OpType::Put, OpType::Put],
&[33, 34, 35],
),
new_batch(
b"k2",
&[1, 10],
&[11, 20],
&[OpType::Put, OpType::Put],
&[21, 30],
),
]);
let mut reader = MergeReaderBuilder::new()
.push_batch_reader(Box::new(reader1))
.push_batch_iter(Box::new(reader2))
.build()
.await
.unwrap();
check_merge_result(
&mut reader,
&[
new_batch(
b"k1",
&[1, 2, 3, 4],
&[11, 12, 10, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 33, 24],
),
new_batch(
b"k2",
&[1, 3, 10],
&[11, 13, 20],
&[OpType::Put, OpType::Put, OpType::Put],
&[21, 23, 30],
),
],
)
.await;
}
}

View File

@@ -21,6 +21,7 @@ use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use crate::error::{InvalidMetadataSnafu, Result};
@@ -35,15 +36,23 @@ pub struct ParquetWriter<'a> {
file_path: &'a str,
/// Input data source.
source: Source,
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
object_store: ObjectStore,
}
impl<'a> ParquetWriter<'a> {
/// Creates a new parquet SST writer.
pub fn new(file_path: &'a str, source: Source, object_store: ObjectStore) -> ParquetWriter {
pub fn new(
file_path: &'a str,
metadata: RegionMetadataRef,
source: Source,
object_store: ObjectStore,
) -> ParquetWriter {
ParquetWriter {
file_path,
source,
metadata,
object_store,
}
}
@@ -52,11 +61,9 @@ impl<'a> ParquetWriter<'a> {
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all(&mut self, opts: &WriteOptions) -> Result<Option<SstInfo>> {
let metadata = self.source.metadata();
let json = metadata.to_json().context(InvalidMetadataSnafu)?;
let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
let ts_column = metadata.time_index_column();
let ts_column = self.metadata.time_index_column();
// TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
let props_builder = WriterProperties::builder()
@@ -78,7 +85,7 @@ impl<'a> ParquetWriter<'a> {
);
let writer_props = props_builder.build();
let write_format = WriteFormat::new(metadata);
let write_format = WriteFormat::new(self.metadata.clone());
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),

View File

@@ -19,9 +19,10 @@ use std::sync::Arc;
use api::greptime_proto::v1;
use api::v1::value::ValueData;
use api::v1::SemanticType;
use api::v1::{OpType, SemanticType};
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
@@ -35,6 +36,7 @@ use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::error::Result;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::worker::WorkerGroup;
/// Env to test mito engine.
@@ -242,3 +244,72 @@ pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
value_data: Some(ValueData::TsMillisecondValue(data)),
}
}
/// A reader for test that pop [Batch] from a vector.
pub struct VecBatchReader {
batches: Vec<Batch>,
}
impl VecBatchReader {
pub fn new(batches: &[Batch]) -> VecBatchReader {
let batches = batches.iter().rev().cloned().collect();
VecBatchReader { batches }
}
}
#[async_trait::async_trait]
impl BatchReader for VecBatchReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
Ok(self.batches.pop())
}
}
impl Iterator for VecBatchReader {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Result<Batch>> {
self.batches.pop().map(Ok)
}
}
pub fn new_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> BatchBuilder {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
1,
Arc::new(UInt64Array::from_iter_values(field.iter().copied())),
)
.unwrap();
builder
}
pub fn new_batch(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(primary_key, timestamps, sequences, op_types, field)
.build()
.unwrap()
}