feat(mito): Implement operations like concat and sort for Batch (#2203)

* feat: Implement slice and first/last timestamp for Batch

* feat(mito): implements sort/concat for Batch

* chore: fix typo

* chore: remove comments

* feat: sort and dedup

* test: test batch operations

* chore: cast enum to test op type

* test: test filter related api

* sytle: fix clippy

* docs: comment for slice

* chore: address CR comment

Don't return Option in get_timestamp()/get_sequence()
This commit is contained in:
Yingwen
2023-08-22 20:03:02 +08:00
committed by GitHub
parent cd3755c615
commit cc3e198975
6 changed files with 551 additions and 16 deletions

View File

@@ -39,7 +39,8 @@ impl BooleanVector {
&self.array
}
pub(crate) fn as_boolean_array(&self) -> &BooleanArray {
/// Get the inner boolean array.
pub fn as_boolean_array(&self) -> &BooleanArray {
&self.array
}

View File

@@ -230,7 +230,8 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
}
pub(crate) fn as_arrow(&self) -> &PrimitiveArray<T::ArrowPrimitive> {
/// Get the inner arrow array.
pub fn as_arrow(&self) -> &PrimitiveArray<T::ArrowPrimitive> {
&self.array
}
@@ -245,7 +246,11 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
// To distinguish with `Vector::slice()`.
fn get_slice(&self, offset: usize, length: usize) -> Self {
/// Slice the batch, returning a new batch.
///
/// # Panics
/// This function panics if `offset + length > self.len()`.
pub fn get_slice(&self, offset: usize, length: usize) -> Self {
let data = self.array.to_data().slice(offset, length);
Self::from_array_data(data)
}
@@ -295,8 +300,7 @@ impl<T: LogicalPrimitiveType> Vector for PrimitiveVector<T> {
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let data = self.array.to_data().slice(offset, length);
Arc::new(Self::from_array_data(data))
Arc::new(self.get_slice(offset, length))
}
fn get(&self, index: usize) -> Value {

View File

@@ -309,7 +309,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
#[snafu(display(
"Invalid parquet SST file {}, location: {}, reason: {}",
file,
location,
reason
))]
InvalidParquet {
file: String,
reason: String,
@@ -332,6 +337,22 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to compute arrow arrays, location: {}, source: {}",
location,
source
))]
ComputeArrow {
location: Location,
source: datatypes::arrow::error::ArrowError,
},
#[snafu(display("Failed to compute vector, location: {}, source: {}", location, source))]
ComputeVector {
location: Location,
source: datatypes::error::Error,
},
#[snafu(display(
"Primary key length mismatch, expect: {}, actual: {}, location: {}",
expect,
@@ -409,6 +430,8 @@ impl ErrorExt for Error {
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
SortValues { .. } => StatusCode::Unexpected,
CompactValues { source, .. } => source.status_code(),

View File

@@ -16,17 +16,25 @@
use std::sync::Arc;
use api::v1::OpType;
use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector, Vector, VectorRef};
use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{DataType, ScalarVector};
use datatypes::value::ValueRef;
use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{ConvertVectorSnafu, InvalidBatchSnafu, Result};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
/// Storage internal representation of a batch of rows
/// for a primary key (time series).
@@ -91,15 +99,266 @@ impl Batch {
/// Returns the number of rows in the batch.
pub fn num_rows(&self) -> usize {
// All vectors have the same length so we use
// the length of timestamps vector.
self.timestamps.len()
// All vectors have the same length. We use the length of sequences vector
// since it has static type.
self.sequences.len()
}
/// Returns true if the number of rows in the batch is 0.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
/// Returns the first timestamp in the batch.
pub fn first_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
}
Some(self.get_timestamp(0))
}
/// Returns the last timestamp in the batch.
pub fn last_timestamp(&self) -> Option<Timestamp> {
if self.timestamps.is_empty() {
return None;
}
Some(self.get_timestamp(self.timestamps.len() - 1))
}
/// Returns the first sequence in the batch or `None` if the batch is empty.
pub fn first_sequence(&self) -> Option<SequenceNumber> {
if self.sequences.is_empty() {
return None;
}
Some(self.get_sequence(0))
}
/// Returns the last sequence in the batch or `None` if the batch is empty.
pub fn last_sequence(&self) -> Option<SequenceNumber> {
if self.sequences.is_empty() {
return None;
}
Some(self.get_sequence(self.sequences.len() - 1))
}
/// Slice the batch, returning a new batch.
///
/// # Panics
/// Panics if `offset + length > self.num_rows()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
let fields = self
.fields
.iter()
.map(|column| BatchColumn {
column_id: column.column_id,
data: column.data.slice(offset, length),
})
.collect();
// We skip using the builder to avoid validating the batch again.
Batch {
// Now we need to clone the primary key. We could try `Bytes` if
// this becomes a bottleneck.
primary_key: self.primary_key.clone(),
timestamps: self.timestamps.slice(offset, length),
sequences: Arc::new(self.sequences.get_slice(offset, length)),
op_types: Arc::new(self.op_types.get_slice(offset, length)),
fields,
}
}
/// Takes `batches` and concat them into one batch.
///
/// All `batches` must have the same primary key.
pub fn concat(mut batches: Vec<Batch>) -> Result<Batch> {
ensure!(
!batches.is_empty(),
InvalidBatchSnafu {
reason: "empty batches",
}
);
if batches.len() == 1 {
// Now we own the `batches` so we could pop it directly.
return Ok(batches.pop().unwrap());
}
let primary_key = std::mem::take(&mut batches[0].primary_key);
let first = &batches[0];
// We took the primary key from the first batch so we don't use `first.primary_key()`.
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.primary_key() == primary_key),
InvalidBatchSnafu {
reason: "batches have different primary key",
}
);
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.fields().len() == first.fields().len()),
InvalidBatchSnafu {
reason: "batches have different field num",
}
);
// We take the primary key from the first batch.
let mut builder = BatchBuilder::new(primary_key);
// Concat timestamps, sequences, op_types, fields.
let array = concat_arrays(batches.iter().map(|b| b.timestamps().to_arrow_array()))?;
builder.timestamps_array(array)?;
let array = concat_arrays(batches.iter().map(|b| b.sequences().to_arrow_array()))?;
builder.sequences_array(array)?;
let array = concat_arrays(batches.iter().map(|b| b.op_types().to_arrow_array()))?;
builder.op_types_array(array)?;
for (i, batch_column) in first.fields.iter().enumerate() {
let array = concat_arrays(batches.iter().map(|b| b.fields()[i].data.to_arrow_array()))?;
builder.push_field_array(batch_column.column_id, array)?;
}
builder.build()
}
/// Removes rows whose op type is delete.
pub fn filter_deleted(&mut self) -> Result<()> {
// Safety: op type column is not null.
let array = self.op_types.as_arrow();
// Find rows with non-delete op type.
let predicate =
arrow::compute::neq_scalar(array, OpType::Delete as u8).context(ComputeArrowSnafu)?;
self.filter(&BooleanVector::from(predicate))
}
// Applies the `predicate` to the batch.
// Safety: We know the array type so we unwrap on casting.
pub fn filter(&mut self, predicate: &BooleanVector) -> Result<()> {
self.timestamps = self
.timestamps
.filter(predicate)
.context(ComputeVectorSnafu)?;
self.sequences = Arc::new(
UInt64Vector::try_from_arrow_array(
arrow::compute::filter(self.sequences.as_arrow(), predicate.as_boolean_array())
.context(ComputeArrowSnafu)?,
)
.unwrap(),
);
self.op_types = Arc::new(
UInt8Vector::try_from_arrow_array(
arrow::compute::filter(self.op_types.as_arrow(), predicate.as_boolean_array())
.context(ComputeArrowSnafu)?,
)
.unwrap(),
);
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
.filter(predicate)
.context(ComputeVectorSnafu)?;
}
Ok(())
}
/// Sorts and dedup rows in the batch.
///
/// It orders rows by timestamp, sequence desc and only keep the latest
/// row for the same timestamp. It doesn't consider op type as sequence
/// should already provide uniqueness for a row.
pub fn sort_and_dedup(&mut self) -> Result<()> {
// If building a converter each time is costly, we may allow passing a
// converter.
let mut converter = RowConverter::new(vec![
SortField::new(self.timestamps.data_type().as_arrow_type()),
SortField::new_with_options(
self.sequences.data_type().as_arrow_type(),
SortOptions {
descending: true,
..Default::default()
},
),
])
.context(ComputeArrowSnafu)?;
// Columns to sort.
let columns = [
self.timestamps.to_arrow_array(),
self.sequences.to_arrow_array(),
];
let rows = converter.convert_columns(&columns).unwrap();
let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
to_sort.sort_unstable_by(|left, right| left.1.cmp(&right.1));
// Dedup by timestamps.
to_sort.dedup_by(|left, right| {
debug_assert_eq!(18, left.1.as_ref().len());
debug_assert_eq!(18, right.1.as_ref().len());
let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
// We only compare the timestamp part and ignore sequence.
left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
});
let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
self.take_in_place(&indices)
}
/// Takes the batch in place.
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
let array = arrow::compute::take(self.sequences.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
// Safety: we know the array and vector type.
self.sequences = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
let array = arrow::compute::take(self.op_types.as_arrow(), indices.as_arrow(), None)
.context(ComputeArrowSnafu)?;
self.op_types = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
for batch_column in &mut self.fields {
batch_column.data = batch_column
.data
.take(indices)
.context(ComputeVectorSnafu)?;
}
Ok(())
}
/// Gets a timestamp at given `index`.
///
/// # Panics
/// Panics if `index` is out-of-bound or the timestamp vector returns null.
fn get_timestamp(&self, index: usize) -> Timestamp {
match self.timestamps.get_ref(index) {
ValueRef::Timestamp(timestamp) => timestamp,
// Int64 is always millisecond.
// TODO(yingwen): Don't allow using int64 as time index.
ValueRef::Int64(v) => Timestamp::new_millisecond(v),
// We have check the data type is timestamp compatible in the [BatchBuilder] so it's safe to panic.
value => panic!("{:?} is not a timestamp", value),
}
}
/// Gets a sequence at given `index`.
///
/// # Panics
/// Panics if `index` is out-of-bound or the sequence vector returns null.
fn get_sequence(&self, index: usize) -> SequenceNumber {
// Safety: sequences is not null so it actually returns Some.
self.sequences.get_data(index).unwrap()
}
}
/// Len of timestamp in arrow row format.
const TIMESTAMP_KEY_LEN: usize = 9;
/// Helper function to concat arrays from `iter`.
fn concat_arrays(iter: impl Iterator<Item = ArrayRef>) -> Result<ArrayRef> {
let arrays: Vec<_> = iter.collect();
let dyn_arrays: Vec<_> = arrays.iter().map(|array| array.as_ref()).collect();
arrow::compute::concat(&dyn_arrays).context(ComputeArrowSnafu)
}
/// A column in a [Batch].
@@ -226,6 +485,11 @@ impl BatchBuilder {
let op_types = self.op_types.context(InvalidBatchSnafu {
reason: "missing op_types",
})?;
// Our storage format ensure these columns are not nullable so
// we use assert here.
assert_eq!(0, timestamps.null_count());
assert_eq!(0, sequences.null_count());
assert_eq!(0, op_types.null_count());
let ts_len = timestamps.len();
ensure!(
@@ -336,3 +600,245 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
(**self).next_batch().await
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use super::*;
use crate::error::Error;
fn new_batch_builder(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> BatchBuilder {
let mut builder = BatchBuilder::new(b"test".to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
1,
Arc::new(UInt64Array::from_iter_values(field.iter().copied())),
)
.unwrap();
builder
}
fn new_batch(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(timestamps, sequences, op_types, field)
.build()
.unwrap()
}
#[test]
fn test_first_last_empty() {
let batch = new_batch(&[], &[], &[], &[]);
assert_eq!(None, batch.first_timestamp());
assert_eq!(None, batch.last_timestamp());
assert_eq!(None, batch.first_sequence());
assert_eq!(None, batch.last_sequence());
}
#[test]
fn test_first_last_one() {
let batch = new_batch(&[1], &[2], &[OpType::Put], &[4]);
assert_eq!(
Timestamp::new_millisecond(1),
batch.first_timestamp().unwrap()
);
assert_eq!(
Timestamp::new_millisecond(1),
batch.last_timestamp().unwrap()
);
assert_eq!(2, batch.first_sequence().unwrap());
assert_eq!(2, batch.last_sequence().unwrap());
}
#[test]
fn test_first_last_multiple() {
let batch = new_batch(
&[1, 2, 3],
&[11, 12, 13],
&[OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23],
);
assert_eq!(
Timestamp::new_millisecond(1),
batch.first_timestamp().unwrap()
);
assert_eq!(
Timestamp::new_millisecond(3),
batch.last_timestamp().unwrap()
);
assert_eq!(11, batch.first_sequence().unwrap());
assert_eq!(13, batch.last_sequence().unwrap());
}
#[test]
fn test_slice() {
let batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let batch = batch.slice(1, 2);
let expect = new_batch(
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
);
assert_eq!(expect, batch);
}
#[test]
fn test_concat_empty() {
let err = Batch::concat(vec![]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}
#[test]
fn test_concat_one() {
let batch = new_batch(&[], &[], &[], &[]);
let actual = Batch::concat(vec![batch.clone()]).unwrap();
assert_eq!(batch, actual);
let batch = new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]);
let actual = Batch::concat(vec![batch.clone()]).unwrap();
assert_eq!(batch, actual);
}
#[test]
fn test_concat_multiple() {
let batches = vec![
new_batch(&[1, 2], &[11, 12], &[OpType::Put, OpType::Put], &[21, 22]),
new_batch(
&[3, 4, 5],
&[13, 14, 15],
&[OpType::Put, OpType::Delete, OpType::Put],
&[23, 24, 25],
),
new_batch(&[], &[], &[], &[]),
new_batch(&[6], &[16], &[OpType::Put], &[26]),
];
let batch = Batch::concat(batches).unwrap();
let expect = new_batch(
&[1, 2, 3, 4, 5, 6],
&[11, 12, 13, 14, 15, 16],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Delete,
OpType::Put,
OpType::Put,
],
&[21, 22, 23, 24, 25, 26],
);
assert_eq!(expect, batch);
}
#[test]
fn test_concat_different() {
let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
let mut batch2 = new_batch(&[2], &[2], &[OpType::Put], &[2]);
batch2.primary_key = b"hello".to_vec();
let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}
#[test]
fn test_filter_deleted_empty() {
let mut batch = new_batch(&[], &[], &[], &[]);
batch.filter_deleted().unwrap();
assert!(batch.is_empty());
}
#[test]
fn test_filter_deleted() {
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Delete, OpType::Put, OpType::Delete, OpType::Put],
&[21, 22, 23, 24],
);
batch.filter_deleted().unwrap();
let expect = new_batch(&[2, 4], &[12, 14], &[OpType::Put, OpType::Put], &[22, 24]);
assert_eq!(expect, batch);
}
#[test]
fn test_filter() {
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
let predicate = BooleanVector::from_vec(vec![false, false, true, true]);
batch.filter(&predicate).unwrap();
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
// filter to empty.
let predicate = BooleanVector::from_vec(vec![false, false]);
batch.filter(&predicate).unwrap();
assert!(batch.is_empty());
}
#[test]
fn test_sort_and_dedup() {
let mut batch = new_batch(
&[2, 3, 1, 4, 5, 2],
&[1, 2, 3, 4, 5, 6],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[21, 22, 23, 24, 25, 26],
);
batch.sort_and_dedup().unwrap();
// It should only keep one timestamp 2.
let expect = new_batch(
&[1, 2, 3, 4, 5],
&[3, 6, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 22, 24, 25],
);
assert_eq!(expect, batch);
}
}

View File

@@ -341,6 +341,7 @@ fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
#[cfg(test)]
mod tests {
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::datatypes::TimeUnit;
use datatypes::prelude::ConcreteDataType;
@@ -352,7 +353,7 @@ mod tests {
use super::*;
const TEST_SEQUENCE: u64 = 1;
const TEST_OP_TYPE: u8 = 1;
const TEST_OP_TYPE: u8 = OpType::Put as u8;
fn build_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));

View File

@@ -255,7 +255,7 @@ pub trait BatchReader: Send {
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
/// again won't return batch again.
///
/// If `Err` is returned, caller should not call this method again, the implementor
/// If `Err` is returned, caller **must** not call this method again, the implementor
/// may or may not panic in such case.
async fn next_batch(&mut self) -> Result<Option<Batch>>;
}