diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 553106a61d..3c22de938d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -366,12 +366,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to sort values source: {}, location: {}", source, location))] - SortValues { - source: ArrowError, - location: Location, - }, - #[snafu(display("Failed to compact values, source: {}, location: {}", source, location))] CompactValues { source: datatypes::error::Error, @@ -446,7 +440,6 @@ impl ErrorExt for Error { ComputeArrow { .. } => StatusCode::Internal, ComputeVector { .. } => StatusCode::Internal, PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, - SortValues { .. } => StatusCode::Unexpected, CompactValues { source, .. } => source.status_code(), InvalidFlumeSender { .. } => StatusCode::InvalidArguments, InvalidSchedulerState { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index f2a0094a6b..2f74b9610c 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -18,21 +18,15 @@ use std::fmt::{Debug, Formatter}; use std::sync::{Arc, RwLock}; use api::v1::OpType; -use datatypes::arrow; -use datatypes::arrow::row::RowConverter; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::value::ValueRef; -use datatypes::vectors::{ - Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, -}; +use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ScanRequest; -use crate::error::{ - CompactValuesSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result, SortValuesSnafu, -}; +use crate::error::{CompactValuesSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::memtable::{BoxedBatchIterator, KeyValues, Memtable, MemtableId}; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -345,46 +339,8 @@ struct Values { } impl Values { - /// Sorts values in place by `timestamp, sequence, op_type`. - fn sort_in_place(&mut self) -> Result<()> { - let mut arrays = Vec::with_capacity(3 + self.fields.len()); - arrays.push(self.timestamp.to_arrow_array()); - arrays.push(self.sequence.to_arrow_array()); - arrays.push(self.op_type.to_arrow_array()); - arrays.extend(self.fields.iter().map(|f| f.to_arrow_array())); - - // only sort by timestamp and sequence. - let fields = arrays - .iter() - .take(2) - .map(|v| arrow::row::SortField::new(v.data_type().clone())) - .collect(); - - let mut converter = RowConverter::new(fields).context(SortValuesSnafu)?; - let rows = converter - .convert_columns(&arrays[0..2]) - .context(SortValuesSnafu)?; - let mut sort_pairs = rows.iter().enumerate().collect::>(); - sort_pairs.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); - let indices = - arrow::array::UInt32Array::from_iter_values(sort_pairs.iter().map(|(i, _)| *i as u32)); - - let res = arrays - .into_iter() - .map(|arr| arrow::compute::take(&arr, &indices, None)) - .collect::>>() - .context(SortValuesSnafu)?; - - self.timestamp = Helper::try_into_vector(&res[0]).context(ConvertVectorSnafu)?; - self.sequence = - Arc::new(UInt64Vector::try_from_arrow_array(&res[1]).context(ConvertVectorSnafu)?); - self.op_type = - Arc::new(UInt8Vector::try_from_arrow_array(&res[2]).context(ConvertVectorSnafu)?); - self.fields = Helper::try_into_vectors(&res[3..]).context(ConvertVectorSnafu)?; - Ok(()) - } - - /// Converts [Values] to `Batch`. + /// Converts [Values] to `Batch`, sorts the batch according to `timestamp, sequence` desc and + /// keeps only the latest row for the same timestamp. pub fn to_batch(&self, primary_key: &[u8], metadata: &RegionMetadataRef) -> Result { let builder = BatchBuilder::with_required_columns( primary_key.to_vec(), @@ -402,7 +358,9 @@ impl Values { }) .collect(); - builder.with_fields(fields).build() + let mut batch = builder.with_fields(fields).build()?; + batch.sort_and_dedup()?; + Ok(batch) } } @@ -445,9 +403,7 @@ mod tests { use datatypes::prelude::{ConcreteDataType, ScalarVector}; use datatypes::schema::ColumnSchema; use datatypes::value::{OrderedFloat, Value}; - use datatypes::vectors::{ - Float32Vector, Float64Vector, Int64Vector, TimestampMillisecondVector, - }; + use datatypes::vectors::{Float64Vector, Int64Vector, TimestampMillisecondVector}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; @@ -547,20 +503,21 @@ mod tests { assert_eq!(1, series.frozen.len()); } - fn check_value(values: &Values, expect: Vec>) { - assert_eq!(values.sequence.len(), values.timestamp.len()); - assert_eq!(values.op_type.len(), values.timestamp.len()); - for f in &values.fields { - assert_eq!(f.len(), values.timestamp.len()); + fn check_value(batch: &Batch, expect: Vec>) { + let ts_len = batch.timestamps().len(); + assert_eq!(batch.sequences().len(), ts_len); + assert_eq!(batch.op_types().len(), ts_len); + for f in batch.fields() { + assert_eq!(f.data.len(), ts_len); } let mut rows = vec![]; - for idx in 0..values.timestamp.len() { - let mut row = Vec::with_capacity(values.fields.len() + 3); - row.push(values.timestamp.get(idx)); - row.push(values.sequence.get(idx)); - row.push(values.op_type.get(idx)); - row.extend(values.fields.iter().map(|f| f.get(idx))); + for idx in 0..ts_len { + let mut row = Vec::with_capacity(batch.fields().len() + 3); + row.push(batch.timestamps().get(idx)); + row.push(batch.sequences().get(idx)); + row.push(batch.op_types().get(idx)); + row.extend(batch.fields().iter().map(|f| f.data.get(idx))); rows.push(row); } @@ -572,45 +529,53 @@ mod tests { #[test] fn test_values_sort() { - let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 4, 3])); - let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 0])); - let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1])); + let schema = schema_for_test(); + let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 3, 4, 3])); + let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 1, 2])); + let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1, 0])); - let fields = vec![Arc::new(Float32Vector::from_vec(vec![1.1, 2.1, 3.3, 4.2])) as Arc<_>]; - let mut values = Values { + let fields = vec![ + Arc::new(Int64Vector::from_vec(vec![4, 3, 2, 1, 2])) as Arc<_>, + Arc::new(Float64Vector::from_vec(vec![1.1, 2.1, 4.2, 3.3, 4.2])) as Arc<_>, + ]; + let values = Values { timestamp: timestamp as Arc<_>, sequence, op_type, fields, }; - values.sort_in_place().unwrap(); + let batch = values.to_batch(b"test", &schema).unwrap(); check_value( - &values, + &batch, vec![ vec![ Value::Timestamp(Timestamp::new_millisecond(1)), Value::UInt64(1), Value::UInt8(1), - Value::Float32(OrderedFloat(1.1)), + Value::Int64(4), + Value::Float64(OrderedFloat(1.1)), ], vec![ Value::Timestamp(Timestamp::new_millisecond(2)), Value::UInt64(1), Value::UInt8(1), - Value::Float32(OrderedFloat(2.1)), + Value::Int64(3), + Value::Float64(OrderedFloat(2.1)), ], vec![ Value::Timestamp(Timestamp::new_millisecond(3)), - Value::UInt64(0), - Value::UInt8(1), - Value::Float32(OrderedFloat(4.2)), + Value::UInt64(2), + Value::UInt8(0), + Value::Int64(2), + Value::Float64(OrderedFloat(4.2)), ], vec![ Value::Timestamp(Timestamp::new_millisecond(4)), Value::UInt64(1), Value::UInt8(1), - Value::Float32(OrderedFloat(3.3)), + Value::Int64(1), + Value::Float64(OrderedFloat(3.3)), ], ], )