diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3c22de938d..54355328e0 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -366,12 +366,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to compact values, source: {}, location: {}", source, location))] - CompactValues { - source: datatypes::error::Error, - location: Location, - }, - #[snafu(display("Invalid flume sender, location: {}", location,))] InvalidFlumeSender { location: Location }, @@ -440,7 +434,6 @@ impl ErrorExt for Error { ComputeArrow { .. } => StatusCode::Internal, ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, - CompactValues { source, .. } => source.status_code(), InvalidFlumeSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, StopScheduler { .. } => StatusCode::Internal, diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 2f74b9610c..940028c085 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -13,20 +13,24 @@ // limitations under the License. use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, Bound}; +use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::{Arc, RwLock}; use api::v1::OpType; +use datatypes::arrow; +use datatypes::arrow::array::ArrayRef; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::value::ValueRef; -use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder}; +use datatypes::vectors::{ + Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, +}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ScanRequest; +use store_api::storage::{ColumnId, ScanRequest}; -use crate::error::{CompactValuesSnafu, PrimaryKeyLengthMismatchSnafu, Result}; +use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::memtable::{BoxedBatchIterator, KeyValues, Memtable, MemtableId}; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -90,13 +94,19 @@ impl Memtable for TimeSeriesMemtable { } fn iter(&self, req: ScanRequest) -> BoxedBatchIterator { - let _projection = req.projection.map(|p| { - p.iter() + let projection = if let Some(projection) = &req.projection { + projection + .iter() .map(|idx| self.region_metadata.column_metadatas[*idx].column_id) - .collect::>() - }); + .collect() + } else { + self.region_metadata + .field_columns() + .map(|c| c.column_id) + .collect() + }; - Box::new(self.series_set.iter_series()) + Box::new(self.series_set.iter_series(projection)) } } @@ -135,10 +145,11 @@ impl SeriesSet { } /// Iterates all series in [SeriesSet]. - fn iter_series(&self) -> Iter { + fn iter_series(&self, projection: HashSet) -> Iter { Iter { metadata: self.region_metadata.clone(), series: self.series.clone(), + projection, last_key: None, } } @@ -147,6 +158,7 @@ impl SeriesSet { struct Iter { metadata: RegionMetadataRef, series: Arc, + projection: HashSet, last_key: Option>, } @@ -165,7 +177,7 @@ impl Iterator for Iter { if let Some((primary_key, series)) = range.next() { self.last_key = Some(primary_key.clone()); let values = series.write().unwrap().compact(&self.metadata); - Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata))) + Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection))) } else { None } @@ -237,39 +249,36 @@ impl Series { self.freeze(region_metadata); let mut frozen = self.frozen.clone(); + + // Each series must contain at least one row + debug_assert!(!frozen.is_empty()); + let values = if frozen.len() == 1 { frozen.pop().unwrap() } else { // TODO(hl): We should keep track of min/max timestamps for each values and avoid // cloning and sorting when values do not overlap with each other. - let total_len: usize = frozen.iter().map(|v| v.timestamp.len()).sum(); - let mut builder = ValueBuilder::new(region_metadata, total_len); + let column_size = frozen[0].fields.len() + 3; - for v in frozen { - let len = v.timestamp.len(); - builder - .timestamp - .extend_slice_of(&*v.timestamp, 0, len) - .context(CompactValuesSnafu)?; - builder - .sequence - .extend_slice_of(&*v.sequence, 0, len) - .context(CompactValuesSnafu)?; - - builder - .op_type - .extend_slice_of(&*v.op_type, 0, len) - .context(CompactValuesSnafu)?; - - for (idx, f) in v.fields.iter().enumerate() { - builder.fields[idx] - .extend_slice_of(&**f, 0, len) - .context(CompactValuesSnafu)?; - } + if cfg!(debug_assertions) { + debug_assert!(frozen + .iter() + .zip(frozen.iter().skip(1)) + .all(|(prev, next)| { prev.fields.len() == next.fields.len() })); } - let values = Values::from(builder); + let arrays = frozen.iter().map(|v| v.columns()).collect::>(); + let concatenated = (0..column_size) + .map(|i| { + let to_concat = arrays.iter().map(|a| a[i].as_ref()).collect::>(); + arrow::compute::concat(&to_concat) + }) + .collect::, _>>() + .context(ComputeArrowSnafu)?; + + debug_assert_eq!(concatenated.len(), column_size); + let values = Values::from_columns(&concatenated)?; self.frozen = vec![values.clone()]; values }; @@ -322,14 +331,14 @@ impl ValueBuilder { /// Returns the length of [ValueBuilder] fn len(&self) -> usize { - let timestamp_len = self.timestamp.len(); - debug_assert_eq!(timestamp_len, self.op_type.len()); - debug_assert_eq!(timestamp_len, self.sequence.len()); - timestamp_len + let sequence_len = self.sequence.len(); + debug_assert_eq!(sequence_len, self.op_type.len()); + debug_assert_eq!(sequence_len, self.timestamp.len()); + sequence_len } } -/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_typee`. +/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_type`. #[derive(Clone)] struct Values { timestamp: VectorRef, @@ -341,7 +350,12 @@ struct Values { impl Values { /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and /// keeps only the latest row for the same timestamp. - pub fn to_batch(&self, primary_key: &[u8], metadata: &RegionMetadataRef) -> Result { + pub fn to_batch( + &self, + primary_key: &[u8], + metadata: &RegionMetadataRef, + projection: &HashSet, + ) -> Result { let builder = BatchBuilder::with_required_columns( primary_key.to_vec(), self.timestamp.clone(), @@ -352,9 +366,11 @@ impl Values { let fields = metadata .field_columns() .zip(self.fields.iter()) - .map(|(c, f)| BatchColumn { - column_id: c.column_id, - data: f.clone(), + .filter_map(|(c, f)| { + projection.get(&c.column_id).map(|c| BatchColumn { + column_id: *c, + data: f.clone(), + }) }) .collect(); @@ -362,6 +378,34 @@ impl Values { batch.sort_and_dedup()?; Ok(batch) } + + /// Returns a vector of all columns converted to arrow [Array] in [Values]. + fn columns(&self) -> Vec { + let mut res = Vec::with_capacity(3 + self.fields.len()); + res.push(self.timestamp.to_arrow_array()); + res.push(self.sequence.to_arrow_array()); + res.push(self.op_type.to_arrow_array()); + res.extend(self.fields.iter().map(|f| f.to_arrow_array())); + res + } + + /// Builds a new [Values] instance from columns. + fn from_columns(cols: &[ArrayRef]) -> Result { + debug_assert!(cols.len() >= 3); + let timestamp = Helper::try_into_vector(&cols[0]).context(ConvertVectorSnafu)?; + let sequence = + Arc::new(UInt64Vector::try_from_arrow_array(&cols[1]).context(ConvertVectorSnafu)?); + let op_type = + Arc::new(UInt8Vector::try_from_arrow_array(&cols[2]).context(ConvertVectorSnafu)?); + let fields = Helper::try_into_vectors(&cols[3..]).context(ConvertVectorSnafu)?; + + Ok(Self { + timestamp, + sequence, + op_type, + fields, + }) + } } impl From for Values { @@ -545,7 +589,9 @@ mod tests { fields, }; - let batch = values.to_batch(b"test", &schema).unwrap(); + let batch = values + .to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect()) + .unwrap(); check_value( &batch, vec![ @@ -581,7 +627,7 @@ mod tests { ) } - fn build_key_values(schema: &RegionMetadataRef, len: usize) -> KeyValues { + fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues { let column_schema = schema .column_metadatas .iter() @@ -598,10 +644,10 @@ mod tests { .map(|i| Row { values: vec![ api::v1::Value { - value_data: Some(ValueData::StringValue(i.to_string())), + value_data: Some(ValueData::StringValue(k0.clone())), }, api::v1::Value { - value_data: Some(ValueData::I64Value(i as i64)), + value_data: Some(ValueData::I64Value(k1)), }, api::v1::Value { value_data: Some(ValueData::TsMillisecondValue(i as i64)), @@ -702,7 +748,7 @@ mod tests { fn test_memtable() { common_telemetry::init_default_ut_logging(); let schema = schema_for_test(); - let kvs = build_key_values(&schema, 100); + let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); let memtable = TimeSeriesMemtable::new(schema, 42).unwrap(); memtable.write(&kvs).unwrap(); @@ -728,4 +774,35 @@ mod tests { .collect::>(); assert_eq!(expected_ts, read); } + + #[test] + fn test_memtable_projection() { + common_telemetry::init_default_ut_logging(); + let schema = schema_for_test(); + let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); + let memtable = TimeSeriesMemtable::new(schema, 42).unwrap(); + memtable.write(&kvs).unwrap(); + + let iter = memtable.iter(ScanRequest { + projection: Some(vec![3]), // k0, k1, ts, v0, v1, only take v0 + ..Default::default() + }); + + let mut v0_all = vec![]; + + for res in iter { + let batch = res.unwrap(); + assert_eq!(1, batch.fields().len()); + let v0 = batch + .fields() + .get(0) + .unwrap() + .data + .as_any() + .downcast_ref::() + .unwrap(); + v0_all.extend(v0.iter_data().map(|v| v.unwrap())); + } + assert_eq!((0..100i64).collect::>(), v0_all); + } }