feat(mito2): concat and projection (#2243)

* refactor: use arrow::compute::concat instead of push values to vector builders

* feat: support projection

* refactor: remove sequence

* refactor: concatenate

* fix: series must not be empty

* refactor: projection
This commit is contained in:
Lei, HUANG
2023-08-25 11:25:27 +08:00
committed by GitHub
parent 20b7f907b2
commit dbe0e95f2f
2 changed files with 126 additions and 56 deletions

View File

@@ -366,12 +366,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to compact values, source: {}, location: {}", source, location))]
CompactValues {
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Invalid flume sender, location: {}", location,))]
InvalidFlumeSender { location: Location },
@@ -440,7 +434,6 @@ impl ErrorExt for Error {
ComputeArrow { .. } => StatusCode::Internal,
ComputeVector { .. } => StatusCode::Internal,
PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments,
CompactValues { source, .. } => source.status_code(),
InvalidFlumeSender { .. } => StatusCode::InvalidArguments,
InvalidSchedulerState { .. } => StatusCode::InvalidArguments,
StopScheduler { .. } => StatusCode::Internal,

View File

@@ -13,20 +13,24 @@
// limitations under the License.
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound};
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, RwLock};
use api::v1::OpType;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::value::ValueRef;
use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder};
use datatypes::vectors::{
Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder,
};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ScanRequest;
use store_api::storage::{ColumnId, ScanRequest};
use crate::error::{CompactValuesSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::{BoxedBatchIterator, KeyValues, Memtable, MemtableId};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -90,13 +94,19 @@ impl Memtable for TimeSeriesMemtable {
}
fn iter(&self, req: ScanRequest) -> BoxedBatchIterator {
let _projection = req.projection.map(|p| {
p.iter()
let projection = if let Some(projection) = &req.projection {
projection
.iter()
.map(|idx| self.region_metadata.column_metadatas[*idx].column_id)
.collect::<Vec<_>>()
});
.collect()
} else {
self.region_metadata
.field_columns()
.map(|c| c.column_id)
.collect()
};
Box::new(self.series_set.iter_series())
Box::new(self.series_set.iter_series(projection))
}
}
@@ -135,10 +145,11 @@ impl SeriesSet {
}
/// Iterates all series in [SeriesSet].
fn iter_series(&self) -> Iter {
fn iter_series(&self, projection: HashSet<ColumnId>) -> Iter {
Iter {
metadata: self.region_metadata.clone(),
series: self.series.clone(),
projection,
last_key: None,
}
}
@@ -147,6 +158,7 @@ impl SeriesSet {
struct Iter {
metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
last_key: Option<Vec<u8>>,
}
@@ -165,7 +177,7 @@ impl Iterator for Iter {
if let Some((primary_key, series)) = range.next() {
self.last_key = Some(primary_key.clone());
let values = series.write().unwrap().compact(&self.metadata);
Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata)))
Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)))
} else {
None
}
@@ -237,39 +249,36 @@ impl Series {
self.freeze(region_metadata);
let mut frozen = self.frozen.clone();
// Each series must contain at least one row
debug_assert!(!frozen.is_empty());
let values = if frozen.len() == 1 {
frozen.pop().unwrap()
} else {
// TODO(hl): We should keep track of min/max timestamps for each values and avoid
// cloning and sorting when values do not overlap with each other.
let total_len: usize = frozen.iter().map(|v| v.timestamp.len()).sum();
let mut builder = ValueBuilder::new(region_metadata, total_len);
let column_size = frozen[0].fields.len() + 3;
for v in frozen {
let len = v.timestamp.len();
builder
.timestamp
.extend_slice_of(&*v.timestamp, 0, len)
.context(CompactValuesSnafu)?;
builder
.sequence
.extend_slice_of(&*v.sequence, 0, len)
.context(CompactValuesSnafu)?;
builder
.op_type
.extend_slice_of(&*v.op_type, 0, len)
.context(CompactValuesSnafu)?;
for (idx, f) in v.fields.iter().enumerate() {
builder.fields[idx]
.extend_slice_of(&**f, 0, len)
.context(CompactValuesSnafu)?;
}
if cfg!(debug_assertions) {
debug_assert!(frozen
.iter()
.zip(frozen.iter().skip(1))
.all(|(prev, next)| { prev.fields.len() == next.fields.len() }));
}
let values = Values::from(builder);
let arrays = frozen.iter().map(|v| v.columns()).collect::<Vec<_>>();
let concatenated = (0..column_size)
.map(|i| {
let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::<Vec<_>>();
arrow::compute::concat(&to_concat)
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(ComputeArrowSnafu)?;
debug_assert_eq!(concatenated.len(), column_size);
let values = Values::from_columns(&concatenated)?;
self.frozen = vec![values.clone()];
values
};
@@ -322,14 +331,14 @@ impl ValueBuilder {
/// Returns the length of [ValueBuilder]
fn len(&self) -> usize {
let timestamp_len = self.timestamp.len();
debug_assert_eq!(timestamp_len, self.op_type.len());
debug_assert_eq!(timestamp_len, self.sequence.len());
timestamp_len
let sequence_len = self.sequence.len();
debug_assert_eq!(sequence_len, self.op_type.len());
debug_assert_eq!(sequence_len, self.timestamp.len());
sequence_len
}
}
/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_typee`.
/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`.
#[derive(Clone)]
struct Values {
timestamp: VectorRef,
@@ -341,7 +350,12 @@ struct Values {
impl Values {
/// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and
/// keeps only the latest row for the same timestamp.
pub fn to_batch(&self, primary_key: &[u8], metadata: &RegionMetadataRef) -> Result<Batch> {
pub fn to_batch(
&self,
primary_key: &[u8],
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
) -> Result<Batch> {
let builder = BatchBuilder::with_required_columns(
primary_key.to_vec(),
self.timestamp.clone(),
@@ -352,9 +366,11 @@ impl Values {
let fields = metadata
.field_columns()
.zip(self.fields.iter())
.map(|(c, f)| BatchColumn {
column_id: c.column_id,
data: f.clone(),
.filter_map(|(c, f)| {
projection.get(&c.column_id).map(|c| BatchColumn {
column_id: *c,
data: f.clone(),
})
})
.collect();
@@ -362,6 +378,34 @@ impl Values {
batch.sort_and_dedup()?;
Ok(batch)
}
/// Returns a vector of all columns converted to arrow [Array] in [Values].
fn columns(&self) -> Vec<ArrayRef> {
let mut res = Vec::with_capacity(3 + self.fields.len());
res.push(self.timestamp.to_arrow_array());
res.push(self.sequence.to_arrow_array());
res.push(self.op_type.to_arrow_array());
res.extend(self.fields.iter().map(|f| f.to_arrow_array()));
res
}
/// Builds a new [Values] instance from columns.
fn from_columns(cols: &[ArrayRef]) -> Result<Self> {
debug_assert!(cols.len() >= 3);
let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?;
let sequence =
Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?);
let op_type =
Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?);
let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?;
Ok(Self {
timestamp,
sequence,
op_type,
fields,
})
}
}
impl From<ValueBuilder> for Values {
@@ -545,7 +589,9 @@ mod tests {
fields,
};
let batch = values.to_batch(b"test", &schema).unwrap();
let batch = values
.to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect())
.unwrap();
check_value(
&batch,
vec![
@@ -581,7 +627,7 @@ mod tests {
)
}
fn build_key_values(schema: &RegionMetadataRef, len: usize) -> KeyValues {
fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
let column_schema = schema
.column_metadatas
.iter()
@@ -598,10 +644,10 @@ mod tests {
.map(|i| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue(i.to_string())),
value_data: Some(ValueData::StringValue(k0.clone())),
},
api::v1::Value {
value_data: Some(ValueData::I64Value(i as i64)),
value_data: Some(ValueData::I64Value(k1)),
},
api::v1::Value {
value_data: Some(ValueData::TsMillisecondValue(i as i64)),
@@ -702,7 +748,7 @@ mod tests {
fn test_memtable() {
common_telemetry::init_default_ut_logging();
let schema = schema_for_test();
let kvs = build_key_values(&schema, 100);
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42).unwrap();
memtable.write(&kvs).unwrap();
@@ -728,4 +774,35 @@ mod tests {
.collect::<HashSet<_>>();
assert_eq!(expected_ts, read);
}
#[test]
fn test_memtable_projection() {
common_telemetry::init_default_ut_logging();
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42).unwrap();
memtable.write(&kvs).unwrap();
let iter = memtable.iter(ScanRequest {
projection: Some(vec![3]), // k0, k1, ts, v0, v1, only take v0
..Default::default()
});
let mut v0_all = vec![];
for res in iter {
let batch = res.unwrap();
assert_eq!(1, batch.fields().len());
let v0 = batch
.fields()
.get(0)
.unwrap()
.data
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
}
assert_eq!((0..100i64).collect::<Vec<_>>(), v0_all);
}
}