feat: Implement merge reader (#225)

* feat: Check columns when constructing Batch

* feat: Merge reader skeleton

* test: Add tests for MergeReader

* feat: Use get_ref to compare row

* feat: Implement MergeReader

* test: Add more tests

* feat: Use MergeReader to implement ChunkReader

Now the ChunkReaderImpl use MergeReader as default reader. Also add more tests to MergeReader.

* docs: Describe the merge algo in merge.rs

Ports the doc comments from kudu to merge.rs to describe the idea of the
merge algorithm we used.

* test: Fix unit tests

* chore: Address CR comments

Panics if number of columns in batch is not equal to `BatchBuilder`'s

* chore: Address CR comments

* chore: Implement Debug and add test for Node
This commit is contained in:
evenyag
2022-09-09 11:35:51 +08:00
committed by GitHub
parent 37a425658c
commit 82dfe78321
8 changed files with 1111 additions and 52 deletions

View File

@@ -6,20 +6,17 @@ use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber};
use crate::error::{self, Error, Result};
use crate::memtable::{IterContext, MemtableRef, MemtableSet};
use crate::read::{Batch, BatchReader, ConcatReader};
use crate::read::{BoxedBatchReader, MergeReaderBuilder};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor};
type BoxedIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
/// Chunk reader implementation.
// Now we use async-trait to implement the chunk reader, which is easier to implement than
// using `Stream`, maybe change to `Stream` if we find out it is more efficient and have
// necessary to do so.
pub struct ChunkReaderImpl {
schema: ProjectedSchemaRef,
iter: Option<BoxedIterator>,
sst_reader: ConcatReader,
sst_reader: BoxedBatchReader,
}
#[async_trait]
@@ -31,7 +28,7 @@ impl ChunkReader for ChunkReaderImpl {
}
async fn next_chunk(&mut self) -> Result<Option<Chunk>> {
let batch = match self.fetch_batch().await? {
let batch = match self.sst_reader.next_batch().await? {
Some(b) => b,
None => return Ok(None),
};
@@ -43,27 +40,8 @@ impl ChunkReader for ChunkReaderImpl {
}
impl ChunkReaderImpl {
pub fn new(
schema: ProjectedSchemaRef,
iter: BoxedIterator,
sst_reader: ConcatReader,
) -> ChunkReaderImpl {
ChunkReaderImpl {
schema,
iter: Some(iter),
sst_reader,
}
}
async fn fetch_batch(&mut self) -> Result<Option<Batch>> {
if let Some(iter) = &mut self.iter {
match iter.next() {
Some(b) => return Ok(Some(b?)),
None => self.iter = None,
}
}
self.sst_reader.next_batch().await
pub fn new(schema: ProjectedSchemaRef, sst_reader: BoxedBatchReader) -> ChunkReaderImpl {
ChunkReaderImpl { schema, sst_reader }
}
}
@@ -130,31 +108,32 @@ impl ChunkReaderBuilder {
.context(error::InvalidProjectionSnafu)?,
);
let num_sources = self.memtables.len() + self.files_to_read.len();
let mut reader_builder = MergeReaderBuilder::with_capacity(schema.clone(), num_sources)
.batch_size(self.iter_ctx.batch_size);
self.iter_ctx.projected_schema = Some(schema.clone());
let mut iters = Vec::with_capacity(self.memtables.len());
for mem in self.memtables {
let iter = mem.iter(&self.iter_ctx)?;
iters.push(iter);
reader_builder = reader_builder.push_batch_iter(iter);
}
// Now we just simply chain all iterators together, ignore duplications/ordering.
let iter = Box::new(iters.into_iter().flatten());
let read_opts = ReadOptions {
batch_size: self.iter_ctx.batch_size,
projected_schema: schema.clone(),
};
let mut sst_readers = Vec::with_capacity(self.files_to_read.len());
for file in &self.files_to_read {
let reader = self
.sst_layer
.read_sst(file.file_name(), &read_opts)
.await?;
sst_readers.push(reader);
reader_builder = reader_builder.push_batch_reader(reader);
}
let reader = ConcatReader::new(sst_readers);
Ok(ChunkReaderImpl::new(schema, iter, reader))
let reader = reader_builder.build();
Ok(ChunkReaderImpl::new(schema, Box::new(reader)))
}
}

View File

@@ -237,6 +237,15 @@ pub enum Error {
#[snafu(backtrace)]
source: crate::schema::Error,
},
#[snafu(display("Failed to push data to batch builder, source: {}", source))]
PushBatch {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to build batch, {}", msg))]
BuildBatch { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -252,7 +261,8 @@ impl ErrorExt for Error {
| BatchMissingColumn { .. }
| BatchMissingTimestamp { .. }
| InvalidTimestamp { .. }
| InvalidProjection { .. } => StatusCode::InvalidArguments,
| InvalidProjection { .. }
| BuildBatch { .. } => StatusCode::InvalidArguments,
Utf8 { .. }
| EncodeJson { .. }
@@ -283,6 +293,8 @@ impl ErrorExt for Error {
| ReadParquetIo { .. }
| InvalidRegionState { .. }
| ReadWal { .. } => StatusCode::StorageUnavailable,
PushBatch { source, .. } => source.status_code(),
}
}

View File

@@ -11,7 +11,7 @@ pub mod manifest;
pub mod memtable;
pub mod metadata;
pub mod proto;
mod read;
pub mod read;
pub mod region;
pub mod schema;
mod snapshot;

View File

@@ -1,11 +1,19 @@
//! Common structs and utilities for read.
use async_trait::async_trait;
use datatypes::vectors::VectorRef;
mod merge;
use crate::error::Result;
use async_trait::async_trait;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{MutableVector, VectorRef};
pub use merge::{MergeReader, MergeReaderBuilder};
use snafu::{ensure, ResultExt};
use crate::error::{self, Result};
/// Storage internal representation of a batch of rows.
///
/// `Batch` must contain at least one column, but might not hold any row.
// Now the structure of `Batch` is still unstable, all pub fields may be changed.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Batch {
@@ -17,7 +25,15 @@ pub struct Batch {
}
impl Batch {
/// Create a new `Batch` from `columns`.
///
/// # Panics
/// Panics if
/// - `columns` is empty.
/// - vectors in `columns` have different length.
pub fn new(columns: Vec<VectorRef>) -> Batch {
Self::assert_columns(&columns);
Batch { columns }
}
@@ -26,6 +42,17 @@ impl Batch {
self.columns.len()
}
#[inline]
pub fn num_rows(&self) -> usize {
// The invariant of `Batch::new()` ensure columns isn't empty.
self.columns[0].len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
#[inline]
pub fn columns(&self) -> &[VectorRef] {
&self.columns
@@ -35,6 +62,121 @@ impl Batch {
pub fn column(&self, idx: usize) -> &VectorRef {
&self.columns[idx]
}
/// Slice the batch, returning a new batch.
///
/// # Panics
/// Panics if `offset + length > self.num_rows()`.
fn slice(&self, offset: usize, length: usize) -> Batch {
let columns = self
.columns
.iter()
.map(|v| v.slice(offset, length))
.collect();
Batch { columns }
}
fn assert_columns(columns: &[VectorRef]) {
assert!(!columns.is_empty());
let length = columns[0].len();
assert!(columns.iter().all(|col| col.len() == length));
}
}
/// Reusable [Batch] builder.
pub struct BatchBuilder {
builders: Vec<Box<dyn MutableVector>>,
}
impl BatchBuilder {
/// Create a new `BatchBuilder` from data types with given `capacity`.
///
/// # Panics
/// Panics if `types` is empty.
pub fn with_capacity<'a, I>(types: I, capacity: usize) -> BatchBuilder
where
I: IntoIterator<Item = &'a ConcreteDataType>,
{
let builders: Vec<_> = types
.into_iter()
.map(|t| t.create_mutable_vector(capacity))
.collect();
assert!(!builders.is_empty());
BatchBuilder { builders }
}
/// Returns number of rows already in this builder.
#[inline]
pub fn num_rows(&self) -> usize {
self.builders[0].len()
}
/// Returns true if no rows in this builder.
#[inline]
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
/// Extend the builder by slice of batch.
///
/// # Panics
/// Panics if
/// - `offset + length > batch.num_rows()`.
/// - Number of columns in `batch` is not equal to the builder's.
pub fn extend_slice_of(&mut self, batch: &Batch, offset: usize, length: usize) -> Result<()> {
assert_eq!(self.builders.len(), batch.num_columns());
for (builder, column) in self.builders.iter_mut().zip(batch.columns()) {
builder
.extend_slice_of(&**column, offset, length)
.context(error::PushBatchSnafu)?;
}
Ok(())
}
/// Push `i-th` row of batch into the builder.
///
/// # Panics
/// Panics if
/// - `i` is out of bound.
/// - Number of columns in `batch` is not equal to the builder's.
pub fn push_row_of(&mut self, batch: &Batch, i: usize) -> Result<()> {
assert_eq!(self.builders.len(), batch.num_columns());
for (builder, column) in self.builders.iter_mut().zip(batch.columns()) {
let value = column.get_ref(i);
builder
.push_value_ref(value)
.context(error::PushBatchSnafu)?;
}
Ok(())
}
/// Create a new [Batch] and reset this builder.
pub fn build(&mut self) -> Result<Batch> {
// Checks length of each builder.
let rows = self.num_rows();
for (i, builder) in self.builders.iter().enumerate() {
ensure!(
rows == builder.len(),
error::BuildBatchSnafu {
msg: format!(
"expect row num {} but builder {} has {}",
rows,
i,
builder.len()
),
}
);
}
let columns = self.builders.iter_mut().map(|b| b.to_vector()).collect();
Ok(Batch { columns })
}
}
/// Async batch reader.
@@ -80,7 +222,11 @@ impl BatchReader for ConcatReader {
let reader = &mut self.readers[self.curr_idx];
match reader.next_batch().await? {
Some(batch) => return Ok(Some(batch)),
Some(batch) => {
if !batch.is_empty() {
return Ok(Some(batch));
}
}
None => self.curr_idx += 1,
}
}
@@ -105,9 +251,9 @@ mod tests {
#[tokio::test]
async fn test_concat_multiple_readers() {
let readers = vec![
read_util::build_boxed_vec_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]),
read_util::build_boxed_vec_reader(&[&[(4, None)]]),
read_util::build_boxed_vec_reader(&[&[(5, Some(5)), (6, Some(6))]]),
read_util::build_boxed_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]),
read_util::build_boxed_reader(&[&[(4, None)]]),
read_util::build_boxed_reader(&[&[(5, Some(5)), (6, Some(6))]]),
];
let mut reader = ConcatReader::new(readers);
@@ -127,10 +273,10 @@ mod tests {
#[tokio::test]
async fn test_concat_reader_with_empty_reader() {
let readers = vec![
read_util::build_boxed_vec_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]),
read_util::build_boxed_reader(&[&[(1, Some(1)), (2, Some(2))], &[(3, None)]]),
// Empty reader.
read_util::build_boxed_vec_reader(&[&[]]),
read_util::build_boxed_vec_reader(&[&[(5, Some(5)), (6, Some(6))]]),
read_util::build_boxed_reader(&[&[]]),
read_util::build_boxed_reader(&[&[(5, Some(5)), (6, Some(6))]]),
];
let mut reader = ConcatReader::new(readers);

View File

@@ -0,0 +1,812 @@
//! Merge reader.
//!
//! The implementation of [`MergeReader`] is inspired by
//! [`kudu's MergeIterator`](https://github.com/apache/kudu/blob/9021f275824faa2bdfe699786957c40c219697c1/src/kudu/common/generic_iterators.cc#L107)
//! and [`CeresDB's MergeIterator`](https://github.com/CeresDB/ceresdb/blob/02a7e3100f47cf16aa6c245ed529a6978be20fbd/analytic_engine/src/row_iter/merge.rs)
//!
//! The main idea of the merge algorithm is to maintain a `merge window`. The window describes,
//! at any given time, the key range where we expect to find the row with the smallest key.
//! A [`Node`] (known as the sub-iterator in kudu) whose NEXT overlaps with the `merge window`
//! is said to be actively participating in the merge.
//!
//! The `merge window` is defined as follows:
//! 1. The window's start is the smallest lower bound of all nodes. We
//! refer to the node that owns this lower bound as LOW.
//! 2. The windows end is the smallest upper bound of all nodes whose
//! lower bounds are less than or equal to LOW's upper bound.
//! 2a. The window's end could be LOW's upper bound itself, if it is the smallest
//! upper bound, but this isn't necessarily the case.
//! 3. The merge window's dimensions change as the merge proceeds, though it
//! only ever moves "to the right" (i.e. the window start/end only increase).
//!
//! We can divide the nodes into two sets, one for whose next rows overlap with the `merge window`,
//! another for whose next rows do not. The merge steady state resembles that of a traditional
//! heap-based merge: the top-most node is popped from HOT, the lower bound is copied to the output
//! and advanced, and the node is pushed back to HOT.
//!
//! In the steady state, we need to move nodes from COLD to HOT whenever the end of the merge window
//! moves; that's a sign that the window may now overlap with a NEXT belonging to a nodes in the
//! second set (COLD). The end of the merge window moves when a node is fully exhausted (i.e. all rows have
//! been copied to the output), or when a node finishes its NEXT and needs to peek again.
//!
//! At any given time, the NEXT belonging to the top-most node in COLD is nearest the merge window.
//! When the merge window's end has moved and we need to refill HOT, the top-most node in COLD is
//! the best candidate. To figure out whether it should be moved, we compare its NEXT's lower bound
//! against the upper bound in HOT's first node: if the lower bound is less than or equal to the key,
//! we move the node from COLD to HOT. On the flip side, when a node from HOT finishes its NEXT and peeks
//! again, we also need to check whether it has exited the merge window. The approach is similar: if
//! its NEXT's lower bound is greater than the upper bound of HOT'S first node, it's time to move it to COLD.
//!
//! A full description of the merge algorithm could be found in [`kudu's comment`](https://github.com/apache/kudu/blob/9021f275824faa2bdfe699786957c40c219697c1/src/kudu/common/generic_iterators.cc#L349)
//! and the [google doc](https://docs.google.com/document/d/1uP0ubjM6ulnKVCRrXtwT_dqrTWjF9tlFSRk0JN2e_O0/edit#).
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fmt;
use async_trait::async_trait;
use store_api::storage::consts;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchBuilder, BatchReader, BoxedBatchReader};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef};
/// Batch data source.
enum Source {
// To avoid the overhead of async-trait (typically a heap allocation), wraps the
// BatchIterator into an enum instead of converting the iterator into a BatchReader.
Iter(BoxedBatchIterator),
Reader(BoxedBatchReader),
}
impl Source {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::Iter(iter) => iter.next().transpose(),
Source::Reader(reader) => reader.next_batch().await,
}
}
/// Fetch next non empty batch.
async fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.next_batch().await? {
if !batch.is_empty() {
return Ok(Some(batch));
}
}
Ok(None)
}
}
impl fmt::Debug for Source {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Source::Iter(_) => write!(f, "Iter(..)"),
Source::Reader(_) => write!(f, "Reader(..)"),
}
}
}
/// Reference to a row in [BatchCursor].
#[derive(Debug)]
struct RowCursor<'a> {
batch: &'a Batch,
pos: usize,
}
impl<'a> RowCursor<'a> {
#[inline]
fn compare(&self, schema: &ProjectedSchema, other: &RowCursor) -> Ordering {
schema.compare_row_of_batch(self.batch, self.pos, other.batch, other.pos)
}
}
/// A `BatchCursor` wraps the `Batch` and allows reading the `Batch` by row.
#[derive(Debug)]
struct BatchCursor {
/// Current buffered `Batch`.
///
/// `Batch` must contains at least one row.
batch: Batch,
/// Index of current row.
///
/// `pos == batch.num_rows()` indicates no more rows to read.
pos: usize,
}
impl BatchCursor {
/// Create a new `BatchCursor`.
///
/// # Panics
/// Panics if `batch` is empty.
fn new(batch: Batch) -> BatchCursor {
assert!(!batch.is_empty());
BatchCursor { batch, pos: 0 }
}
/// Returns true if there are remaining rows to read.
#[inline]
fn is_valid(&self) -> bool {
!self.is_empty()
}
/// Returns first row of current batch.
///
/// # Panics
/// Panics if `self` is invalid.
fn first_row(&self) -> RowCursor {
assert!(self.is_valid());
RowCursor {
batch: &self.batch,
pos: self.pos,
}
}
/// Returns last row of current batch.
///
/// # Panics
/// Panics if `self` is invalid.
fn last_row(&self) -> RowCursor {
assert!(self.is_valid());
RowCursor {
batch: &self.batch,
pos: self.batch.num_rows() - 1,
}
}
#[inline]
fn is_empty(&self) -> bool {
self.pos >= self.batch.num_rows()
}
/// Take slice of batch with at most `length` rows from the cursor, then
/// advance the cursor.
///
/// # Panics
/// Panics if `self` is invalid.
fn take_batch_slice(&mut self, length: usize) -> Batch {
let length = length.min(self.batch.num_rows() - self.pos);
let batch = self.batch.slice(self.pos, length);
self.pos += batch.num_rows();
batch
}
/// Push at most `length` rows from `self` to the `builder` and advance the cursor.
///
/// # Panics
/// Panics if `self` is invalid.
fn push_rows_to(&mut self, builder: &mut BatchBuilder, length: usize) -> Result<()> {
let length = length.min(self.batch.num_rows() - self.pos);
builder.extend_slice_of(&self.batch, self.pos, length)?;
self.pos += length;
Ok(())
}
/// Push next row from `self` to the `builder` and advance the cursor.
///
/// # Panics
/// Panics if `self` is invalid.
fn push_next_row_to(&mut self, builder: &mut BatchBuilder) -> Result<()> {
builder.push_row_of(&self.batch, self.pos)?;
self.pos += 1;
Ok(())
}
}
/// A `Node` represent an individual input data source to be merged.
struct Node {
/// Schema of data source.
schema: ProjectedSchemaRef,
/// Data source of this `Node`.
source: Source,
/// Current batch to be read.
///
/// `None` means the `source` has reached EOF.
cursor: Option<BatchCursor>,
}
impl fmt::Debug for Node {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Node")
.field("source", &self.source)
.field("cursor", &self.cursor)
.finish_non_exhaustive()
}
}
impl Node {
async fn new(schema: ProjectedSchemaRef, mut source: Source) -> Result<Node> {
let cursor = source.next_non_empty_batch().await?.map(BatchCursor::new);
Ok(Node {
schema,
source,
cursor,
})
}
/// Returns the reference to the cursor.
///
/// # Panics
/// Panics if `self` is EOF.
fn cursor_ref(&self) -> &BatchCursor {
self.cursor.as_ref().unwrap()
}
/// Returns first row in cursor.
///
/// # Panics
/// Panics if `self` is EOF.
fn first_row(&self) -> RowCursor {
self.cursor_ref().first_row()
}
/// Returns last row in cursor.
///
/// # Panics
/// Panics if `self` is EOF.
fn last_row(&self) -> RowCursor {
self.cursor_ref().last_row()
}
/// Compare first row of two nodes.
///
/// # Panics
/// Panics if
/// - either `self` or `other` is EOF.
fn compare_first_row(&self, other: &Node) -> Ordering {
self.first_row().compare(&self.schema, &other.first_row())
}
/// Returns true if no more batch could be fetched from this node.
fn is_eof(&self) -> bool {
self.cursor.is_none()
}
/// Returns true if the key range of current batch in `self` is behind (exclusive) current
/// batch in `other`.
///
/// # Panics
/// Panics if
/// - either `self` or `other` is EOF.
fn is_behind(&self, other: &Node) -> bool {
let first = self.first_row();
let last = other.last_row();
// `self` is after `other` if min (first) row of `self` is greater than
// max (last) row of `other`.
first.compare(&self.schema, &last) == Ordering::Greater
}
/// Fetch next batch and reset its cursor if `self` isn't EOF and the cursor
/// is empty.
///
/// Returns true if a new batch has been fetched.
async fn maybe_fetch_next_batch(&mut self) -> Result<bool> {
let need_fetch = !self.is_eof() && self.cursor_ref().is_empty();
if !need_fetch {
// Still has remaining rows, no need to fetch.
return Ok(false);
}
// This ensure the cursor is either non empty or None (EOF).
match self.source.next_non_empty_batch().await? {
Some(batch) => {
self.cursor = Some(BatchCursor::new(batch));
Ok(true)
}
None => {
// EOF
self.cursor = None;
Ok(false)
}
}
}
/// Returns the mutable reference to the cursor.
///
/// # Panics
/// Panics if `self` is EOF.
fn cursor_mut(&mut self) -> &mut BatchCursor {
self.cursor.as_mut().unwrap()
}
/// Take batch from this node.
///
/// # Panics
/// Panics if `self` is EOF.
fn take_batch_slice(&mut self, length: usize) -> Batch {
self.cursor_mut().take_batch_slice(length)
}
/// Push at most `length` rows from `self` to the `builder`.
///
/// # Panics
/// Panics if `self` is EOF.
fn push_rows_to(&mut self, builder: &mut BatchBuilder, length: usize) -> Result<()> {
self.cursor_mut().push_rows_to(builder, length)
}
/// Push next row from `self` to the `builder`.
///
/// # Panics
/// Panics if `self` is EOF.
fn push_next_row_to(&mut self, builder: &mut BatchBuilder) -> Result<()> {
self.cursor_mut().push_next_row_to(builder)
}
}
impl PartialEq for Node {
fn eq(&self, other: &Node) -> bool {
self.compare_first_row(other) == Ordering::Equal
}
}
impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Node {
fn cmp(&self, other: &Node) -> Ordering {
// The std binary heap is a max heap, but we want the nodes are ordered in
// ascend order, so we compare the nodes in reverse order.
other.compare_first_row(self)
}
}
/// A reader that would sort and merge `Batch` from multiple sources by key.
///
/// `Batch` from each `Source` **must** be sorted.
pub struct MergeReader {
/// Whether the reader has been initialized.
initialized: bool,
/// Schema of data source.
schema: ProjectedSchemaRef,
/// Input data sources.
///
/// All data source must have same schema. Initialize the reader would
/// convert all `Source`s into `Node`s and then clear this vector.
sources: Vec<Source>,
/// Holds `Node` whose key range of current batch **is** overlapped with the merge window.
///
/// `Node` in this heap **must** not be empty. A `merge window` is the key range of the
/// root node in the `hot` heap.
hot: BinaryHeap<Node>,
/// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window.
///
/// `Node` in this heap **must** not be empty.
cold: BinaryHeap<Node>,
/// Suggested row number of each batch.
///
/// The size of the batch yield from this reader may not always equal to this suggested size.
batch_size: usize,
/// Buffered batch.
batch_builder: BatchBuilder,
}
#[async_trait]
impl BatchReader for MergeReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.fetch_next_batch().await
}
}
pub struct MergeReaderBuilder {
schema: ProjectedSchemaRef,
sources: Vec<Source>,
batch_size: usize,
}
impl MergeReaderBuilder {
pub fn new(schema: ProjectedSchemaRef) -> Self {
MergeReaderBuilder::with_capacity(schema, 0)
}
pub fn with_capacity(schema: ProjectedSchemaRef, capacity: usize) -> Self {
MergeReaderBuilder {
schema,
sources: Vec::with_capacity(capacity),
batch_size: consts::READ_BATCH_SIZE,
}
}
pub fn push_batch_iter(mut self, iter: BoxedBatchIterator) -> Self {
self.sources.push(Source::Iter(iter));
self
}
pub fn push_batch_reader(mut self, reader: BoxedBatchReader) -> Self {
self.sources.push(Source::Reader(reader));
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn build(self) -> MergeReader {
let num_sources = self.sources.len();
let column_schemas = self.schema.schema_to_read().schema().column_schemas();
let batch_builder = BatchBuilder::with_capacity(
column_schemas.iter().map(|c| &c.data_type),
self.batch_size,
);
MergeReader {
initialized: false,
schema: self.schema,
sources: self.sources,
hot: BinaryHeap::with_capacity(num_sources),
cold: BinaryHeap::with_capacity(num_sources),
batch_size: self.batch_size,
batch_builder,
}
}
}
impl MergeReader {
/// Initialize the reader if it has not yet been initialized.
async fn try_init(&mut self) -> Result<()> {
if self.initialized {
return Ok(());
}
if self.sources.is_empty() {
self.initialized = true;
return Ok(());
}
for source in self.sources.drain(..) {
let node = Node::new(self.schema.clone(), source).await?;
if !node.is_eof() {
self.cold.push(node);
}
}
self.refill_hot();
self.initialized = true;
Ok(())
}
async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
self.try_init().await?;
while !self.hot.is_empty() && self.batch_builder.num_rows() < self.batch_size {
if self.hot.len() == 1 {
// No need to do merge sort if only one batch in the hot heap.
let fetch_row_num = self.batch_size - self.batch_builder.num_rows();
if let Some(batch) = self.fetch_batch_from_hottest(fetch_row_num).await? {
// The builder is empty and we have fetched a new batch from this node.
return Ok(Some(batch));
}
// Otherwise, some rows may have been pushed into the builder.
} else {
// We could only fetch one row from the hottest node.
self.fetch_one_row_from_hottest().await?;
}
}
// Check buffered rows in the builder.
if self.batch_builder.is_empty() {
Ok(None)
} else {
self.batch_builder.build().map(Some)
}
}
/// Move nodes in `cold` heap, whose key range is overlapped with current merge
/// window to `hot` heap.
fn refill_hot(&mut self) {
while !self.cold.is_empty() {
if let Some(merge_window) = self.hot.peek() {
let warmest = self.cold.peek().unwrap();
if warmest.is_behind(merge_window) {
// if the warmest node in the `cold` heap is totally after the
// `merge_window`, then no need to add more nodes into the `hot`
// heap for merge sorting.
break;
}
}
let warmest = self.cold.pop().unwrap();
self.hot.push(warmest);
}
}
/// Fetch at most `fetch_row_num` from the hottest node and attempt to return them directly
/// instead of pushing into the builder if the `self.batch_builder` is empty.
async fn fetch_batch_from_hottest(&mut self, fetch_row_num: usize) -> Result<Option<Batch>> {
assert_eq!(1, self.hot.len());
let mut hottest = self.hot.pop().unwrap();
let batch = if self.batch_builder.is_empty() {
Some(hottest.take_batch_slice(fetch_row_num))
} else {
hottest.push_rows_to(&mut self.batch_builder, fetch_row_num)?;
None
};
self.reheap(hottest).await?;
Ok(batch)
}
/// Fetch one row from the hottest node.
async fn fetch_one_row_from_hottest(&mut self) -> Result<()> {
let mut hottest = self.hot.pop().unwrap();
hottest.push_next_row_to(&mut self.batch_builder)?;
self.reheap(hottest).await
}
/// Fetch next batch from this node and reset its cursor, then push the node back to a
/// proper heap.
async fn reheap(&mut self, mut node: Node) -> Result<()> {
let fetched_new_batch = node.maybe_fetch_next_batch().await?;
if node.is_eof() {
// The merge window would be updated, need to refill the hot heap.
self.refill_hot();
} else if fetched_new_batch {
// A new batch has been fetched from the node, thus the key range of this node
// has been changed. Try to find a proper heap for this node.
let node_is_cold = if let Some(hottest) = self.hot.peek() {
// Now key range of this node is behind the hottest node's.
node.is_behind(hottest)
} else {
false
};
if node_is_cold {
self.cold.push(node);
} else {
self.hot.push(node);
}
// Anyway, the merge window has been changed, we need to refill the hot heap.
self.refill_hot();
} else {
// No new batch has been fetched, so the end key of merge window has not been
// changed, we could just put the node back to the hot heap.
self.hot.push(node);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use datatypes::prelude::ScalarVector;
use datatypes::vectors::Int64Vector;
use super::*;
use crate::test_util::read_util;
#[tokio::test]
async fn test_merge_reader_empty() {
let schema = read_util::new_projected_schema();
let mut reader = MergeReaderBuilder::new(schema).build();
assert!(reader.next_batch().await.unwrap().is_none());
// Call next_batch() again is allowed.
assert!(reader.next_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_node() {
let schema = read_util::new_projected_schema();
let left_source = read_util::build_boxed_iter(&[&[(1, None), (3, None), (5, None)]]);
let mut left = Node::new(schema.clone(), Source::Iter(left_source))
.await
.unwrap();
let right_source = read_util::build_boxed_reader(&[&[(2, None), (3, None), (6, None)]]);
let mut right = Node::new(schema.clone(), Source::Reader(right_source))
.await
.unwrap();
// We use reverse order for a node.
assert!(left > right);
assert_ne!(left, right);
// Advance the left and right node.
left.cursor_mut().pos += 1;
right.cursor_mut().pos += 1;
assert_eq!(left, right);
// Check Debug is implemented.
let output = format!("{:?}", left);
assert!(output.contains("cursor"));
assert!(output.contains("pos: 1"));
let output = format!("{:?}", left.first_row());
assert!(output.contains("pos: 1"));
}
type Batches<'a> = &'a [&'a [(i64, Option<i64>)]];
fn build_merge_reader(sources: &[Batches], num_iter: usize, batch_size: usize) -> MergeReader {
let schema = read_util::new_projected_schema();
let mut builder =
MergeReaderBuilder::with_capacity(schema, sources.len()).batch_size(batch_size);
for (i, source) in sources.iter().enumerate() {
if i < num_iter {
builder = builder.push_batch_iter(read_util::build_boxed_iter(source));
} else {
builder = builder.push_batch_reader(read_util::build_boxed_reader(source));
}
}
builder.build()
}
async fn check_merge_reader_result(mut reader: MergeReader, input: &[Batches<'_>]) {
let mut expect: Vec<_> = input
.iter()
.flat_map(|v| v.iter())
.flat_map(|v| v.iter().copied())
.collect();
expect.sort_by_key(|k| k.0);
let result = read_util::collect_kv_batch(&mut reader).await;
assert_eq!(expect, result);
// Call next_batch() again is allowed.
assert!(reader.next_batch().await.unwrap().is_none());
}
async fn check_merge_reader_by_batch(mut reader: MergeReader, expect_batches: Batches<'_>) {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
let key = batch
.column(0)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let value = batch
.column(1)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let batch: Vec<_> = key
.iter_data()
.zip(value.iter_data())
.map(|(k, v)| (k.unwrap(), v))
.collect();
result.push(batch);
}
for (expect, actual) in expect_batches.iter().zip(result.iter()) {
assert_eq!(expect, actual);
}
}
#[tokio::test]
async fn test_merge_multiple_interleave() {
common_telemetry::init_default_ut_logging();
let input: &[Batches] = &[
&[&[(1, Some(1)), (5, Some(5)), (9, Some(9))]],
&[&[(2, Some(2)), (3, Some(3)), (8, Some(8))]],
&[&[(7, Some(7)), (12, Some(12))]],
];
let reader = build_merge_reader(input, 1, 3);
check_merge_reader_result(reader, input).await;
let input: &[Batches] = &[
&[
&[(1, Some(1)), (2, Some(2))],
&[(3, Some(3)), (4, Some(4))],
&[(5, Some(5)), (12, Some(12))],
],
&[&[(6, Some(6)), (7, Some(7)), (18, Some(18))]],
&[&[(13, Some(13)), (15, Some(15))]],
];
let reader = build_merge_reader(input, 1, 3);
check_merge_reader_by_batch(
reader,
&[
// The former two batches could be returned directly.
&[(1, Some(1)), (2, Some(2))],
&[(3, Some(3)), (4, Some(4))],
&[(5, Some(5)), (6, Some(6)), (7, Some(7))],
&[(12, Some(12)), (13, Some(13)), (15, Some(15))],
&[(18, Some(18))],
],
)
.await;
let input: &[Batches] = &[
&[
&[(1, Some(1)), (2, Some(2))],
&[(5, Some(5)), (9, Some(9))],
&[(14, Some(14)), (17, Some(17))],
],
&[&[(6, Some(6)), (7, Some(7))], &[(15, Some(15))]],
];
let reader = build_merge_reader(input, 1, 2);
check_merge_reader_by_batch(
reader,
&[
&[(1, Some(1)), (2, Some(2))],
// Could not return batch (6, 7) directly.
&[(5, Some(5)), (6, Some(6))],
&[(7, Some(7)), (9, Some(9))],
&[(14, Some(14)), (15, Some(15))],
&[(17, Some(17))],
],
)
.await;
}
#[tokio::test]
async fn test_merge_one_source() {
common_telemetry::init_default_ut_logging();
let input: &[Batches] = &[&[
&[(1, Some(1)), (2, Some(2)), (3, Some(3))],
&[(4, Some(4)), (5, Some(5)), (6, Some(6))],
]];
let reader = build_merge_reader(input, 1, 2);
check_merge_reader_result(reader, input).await;
}
#[tokio::test]
async fn test_merge_with_empty_batch() {
let input: &[Batches] = &[
&[
&[(1, Some(1)), (2, Some(2))],
&[(3, Some(3)), (6, Some(6))],
&[],
&[],
&[(8, Some(8)), (12, Some(12))],
&[],
],
&[
&[(4, Some(4)), (5, Some(5))],
&[],
&[(15, None), (18, None), (20, None)],
],
&[&[(13, Some(13)), (19, None)], &[], &[]],
];
let reader = build_merge_reader(input, 1, 2);
check_merge_reader_result(reader, input).await;
}
#[tokio::test]
async fn test_merge_duplicate_key() {
let input: &[Batches] = &[
&[
&[(1, Some(1)), (5, Some(5)), (8, Some(8))],
&[(9, None), (11, None)],
&[(12, Some(12)), (15, None)],
],
&[&[(1, Some(1)), (3, Some(3)), (8, Some(8))], &[(16, None)]],
&[
&[(7, Some(7)), (12, Some(12))],
&[(15, None), (16, None), (17, None)],
],
&[&[(15, None)]],
];
let reader = build_merge_reader(input, 2, 2);
check_merge_reader_result(reader, input).await;
}
}

View File

@@ -140,7 +140,7 @@ async fn test_read_after_flush() {
tester.put(&[(3000, Some(300))]).await;
tester.wait_flush_done().await;
let expect = vec![(3000, Some(300)), (1000, Some(100)), (2000, Some(200))];
let expect = vec![(1000, Some(100)), (2000, Some(200)), (3000, Some(300))];
let output = tester.full_scan().await;
assert_eq!(expect, output);

View File

@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
@@ -230,6 +231,20 @@ impl StoreSchema {
Ok(Batch::new(columns))
}
fn compare_row_of_batch(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering {
let indices = self.full_key_indices();
for idx in indices {
let (left_col, right_col) = (left.column(idx), right.column(idx));
// Comparision of vector is done by virtual method calls currently. Consider using
// enum dispatch if this becomes bottleneck.
let order = left_col.get_ref(i).cmp(&right_col.get_ref(j));
if order != Ordering::Equal {
return order;
}
}
Ordering::Equal
}
fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result<StoreSchema> {
let column_schemas: Vec<_> = columns
.iter_all_columns()
@@ -300,6 +315,13 @@ impl StoreSchema {
fn num_columns(&self) -> usize {
self.schema.num_columns()
}
fn full_key_indices(&self) -> impl Iterator<Item = usize> {
// row key, sequence, op_type
(0..self.row_key_end)
.chain(std::iter::once(self.sequence_index()))
.chain(std::iter::once(self.op_type_index()))
}
}
impl TryFrom<ArrowSchema> for StoreSchema {
@@ -530,6 +552,25 @@ impl ProjectedSchema {
Batch::new(columns)
}
/// Compare `i-th` in `left` to `j-th` row in `right` by key (row key + internal columns).
///
/// The caller should ensure `left` and `right` have same schema as `self.schema_to_read()`.
///
/// # Panics
/// Panics if
/// - `i` or `j` is out of bound.
/// - `left` or `right` has insufficient column num.
#[inline]
pub fn compare_row_of_batch(
&self,
left: &Batch,
i: usize,
right: &Batch,
j: usize,
) -> Ordering {
self.schema_to_read.compare_row_of_batch(left, i, right, j)
}
fn build_schema_to_read(
region_schema: &RegionSchema,
projection: &Projection,

View File

@@ -2,10 +2,31 @@ use std::sync::Arc;
use async_trait::async_trait;
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector};
use crate::error::Result;
use crate::memtable::{BatchIterator, BoxedBatchIterator, RowOrdering};
use crate::metadata::RegionMetadata;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
use crate::test_util::descriptor_util::RegionDescBuilder;
/// Create a new region schema (timestamp, v0).
fn new_region_schema() -> RegionSchemaRef {
let desc = RegionDescBuilder::new("read-util")
.enable_version_column(false)
.push_value_column(("v0", LogicalTypeId::Int64, true))
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
metadata.schema().clone()
}
/// Create a new projected schema (timestamp, v0).
pub fn new_projected_schema() -> ProjectedSchemaRef {
let region_schema = new_region_schema();
Arc::new(ProjectedSchema::new(region_schema, None).unwrap())
}
/// Build a new batch, with 0 sequence and op_type.
fn new_kv_batch(key_values: &[(i64, Option<i64>)]) -> Batch {
@@ -38,6 +59,28 @@ fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option<i64>)]]) {
assert_eq!(batches.len(), expect.len());
}
pub async fn collect_kv_batch(reader: &mut dyn BatchReader) -> Vec<(i64, Option<i64>)> {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
let key = batch
.column(0)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let value = batch
.column(1)
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
for (k, v) in key.iter_data().zip(value.iter_data()) {
result.push((k.unwrap(), v));
}
}
result
}
pub async fn check_reader_with_kv_batch(
reader: &mut dyn BatchReader,
expect: &[&[(i64, Option<i64>)]],
@@ -50,8 +93,9 @@ pub async fn check_reader_with_kv_batch(
check_kv_batch(&result, expect);
}
/// A reader for test that takes batch from Vec.
/// A reader for test that pop batch from Vec.
pub struct VecBatchReader {
schema: ProjectedSchemaRef,
batches: Vec<Batch>,
}
@@ -59,7 +103,10 @@ impl VecBatchReader {
fn new(mut batches: Vec<Batch>) -> VecBatchReader {
batches.reverse();
VecBatchReader { batches }
VecBatchReader {
schema: new_projected_schema(),
batches,
}
}
}
@@ -70,16 +117,38 @@ impl BatchReader for VecBatchReader {
}
}
impl Iterator for VecBatchReader {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Result<Batch>> {
self.batches.pop().map(Ok)
}
}
impl BatchIterator for VecBatchReader {
fn schema(&self) -> ProjectedSchemaRef {
self.schema.clone()
}
fn ordering(&self) -> RowOrdering {
// TODO(yingwen): Allow setting the row ordering.
RowOrdering::Key
}
}
pub fn build_vec_reader(batches: &[&[(i64, Option<i64>)]]) -> VecBatchReader {
let batches: Vec<_> = batches
.iter()
.filter(|key_values| !key_values.is_empty())
.map(|key_values| new_kv_batch(key_values))
.collect();
VecBatchReader::new(batches)
}
pub fn build_boxed_vec_reader(batches: &[&[(i64, Option<i64>)]]) -> BoxedBatchReader {
pub fn build_boxed_reader(batches: &[&[(i64, Option<i64>)]]) -> BoxedBatchReader {
Box::new(build_vec_reader(batches))
}
pub fn build_boxed_iter(batches: &[&[(i64, Option<i64>)]]) -> BoxedBatchIterator {
Box::new(build_vec_reader(batches))
}