diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 169a77c8ad..2f7a585f82 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -331,6 +331,30 @@ pub enum Error { location: Location, source: datatypes::error::Error, }, + + #[snafu(display( + "Primary key length mismatch, expect: {}, actual: {}, location: {}", + expect, + actual, + location + ))] + PrimaryKeyLengthMismatch { + expect: usize, + actual: usize, + location: Location, + }, + + #[snafu(display("Failed to sort values source: {}, location: {}", source, location))] + SortValues { + source: ArrowError, + location: Location, + }, + + #[snafu(display("Failed to compact values, source: {}, location: {}", source, location))] + CompactValues { + source: datatypes::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -385,6 +409,9 @@ impl ErrorExt for Error { InvalidBatch { .. } => StatusCode::InvalidArguments, InvalidRecordBatch { .. } => StatusCode::InvalidArguments, ConvertVector { source, .. } => source.status_code(), + PrimaryKeyLengthMismatch { .. } => StatusCode::InvalidArguments, + SortValues { .. } => StatusCode::Unexpected, + CompactValues { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index fe9c9b8a1c..b2c7add861 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,6 +14,8 @@ //! Memtables are write buffers for regions. +pub mod time_series; + pub mod key_values; pub(crate) mod version; @@ -22,15 +24,19 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use store_api::metadata::RegionMetadataRef; +use store_api::storage::ScanRequest; use crate::error::Result; pub use crate::memtable::key_values::KeyValues; +use crate::read::Batch; /// Id for memtables. /// /// Should be unique under the same region. pub type MemtableId = u32; +pub type BoxedBatchIterator = Box>>; + /// In memory write buffer. pub trait Memtable: Send + Sync + fmt::Debug { /// Returns the id of this memtable. @@ -38,6 +44,8 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Write key values into the memtable. fn write(&self, kvs: &KeyValues) -> Result<()>; + + fn iter(&self, req: ScanRequest) -> BoxedBatchIterator; } pub type MemtableRef = Arc; @@ -73,6 +81,10 @@ impl Memtable for EmptyMemtable { fn write(&self, _kvs: &KeyValues) -> Result<()> { Ok(()) } + + fn iter(&self, _req: ScanRequest) -> BoxedBatchIterator { + Box::new(std::iter::empty()) + } } /// Default memtable builder. diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs new file mode 100644 index 0000000000..f2a0094a6b --- /dev/null +++ b/src/mito2/src/memtable/time_series.rs @@ -0,0 +1,766 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, Bound}; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, RwLock}; + +use api::v1::OpType; +use datatypes::arrow; +use datatypes::arrow::row::RowConverter; +use datatypes::data_type::DataType; +use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; +use datatypes::value::ValueRef; +use datatypes::vectors::{ + Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, +}; +use snafu::{ensure, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ScanRequest; + +use crate::error::{ + CompactValuesSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result, SortValuesSnafu, +}; +use crate::memtable::{BoxedBatchIterator, KeyValues, Memtable, MemtableId}; +use crate::read::{Batch, BatchBuilder, BatchColumn}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + +/// Initial vector builder capacity. +const INITIAL_BUILDER_CAPACITY: usize = 32; + +/// Memtable implementation that groups rows by their primary key. +pub struct TimeSeriesMemtable { + id: MemtableId, + region_metadata: RegionMetadataRef, + row_codec: McmpRowCodec, + series_set: SeriesSet, +} + +impl TimeSeriesMemtable { + pub fn new(region_metadata: RegionMetadataRef, id: MemtableId) -> Result { + let row_codec = McmpRowCodec::new( + region_metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + let series_set = SeriesSet::new(region_metadata.clone()); + Ok(Self { + id, + region_metadata, + series_set, + row_codec, + }) + } +} + +impl Debug for TimeSeriesMemtable { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TimeSeriesMemtable").finish() + } +} + +impl Memtable for TimeSeriesMemtable { + fn id(&self) -> MemtableId { + self.id + } + + fn write(&self, kvs: &KeyValues) -> Result<()> { + for kv in kvs.iter() { + ensure!( + kv.num_primary_keys() == self.row_codec.num_fields(), + PrimaryKeyLengthMismatchSnafu { + expect: self.row_codec.num_fields(), + actual: kv.num_primary_keys() + } + ); + let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?; + let fields = kv.fields().collect(); + let series = self.series_set.get_or_add_series(primary_key_encoded); + let mut guard = series.write().unwrap(); + guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields); + } + Ok(()) + } + + fn iter(&self, req: ScanRequest) -> BoxedBatchIterator { + let _projection = req.projection.map(|p| { + p.iter() + .map(|idx| self.region_metadata.column_metadatas[*idx].column_id) + .collect::>() + }); + + Box::new(self.series_set.iter_series()) + } +} + +type SeriesRwLockMap = RwLock, Arc>>>; + +struct SeriesSet { + region_metadata: RegionMetadataRef, + series: Arc, +} + +impl SeriesSet { + fn new(region_metadata: RegionMetadataRef) -> Self { + Self { + region_metadata, + series: Default::default(), + } + } +} + +impl SeriesSet { + /// Returns the series for given primary key, or create a new series if not already exist. + fn get_or_add_series(&self, primary_key: Vec) -> Arc> { + if let Some(series) = self.series.read().unwrap().get(&primary_key) { + return series.clone(); + }; + let s = Arc::new(RwLock::new(Series::new(&self.region_metadata))); + let mut indices = self.series.write().unwrap(); + match indices.entry(primary_key) { + Entry::Vacant(v) => { + v.insert(s.clone()); + s + } + // safety: series must exist at given index. + Entry::Occupied(v) => v.get().clone(), + } + } + + /// Iterates all series in [SeriesSet]. + fn iter_series(&self) -> Iter { + Iter { + metadata: self.region_metadata.clone(), + series: self.series.clone(), + last_key: None, + } + } +} + +struct Iter { + metadata: RegionMetadataRef, + series: Arc, + last_key: Option>, +} + +impl Iterator for Iter { + type Item = Result; + + fn next(&mut self) -> Option { + let map = self.series.read().unwrap(); + let mut range = match &self.last_key { + None => map.range::, _>(..), + Some(last_key) => { + map.range::, _>((Bound::Excluded(last_key), Bound::Unbounded)) + } + }; + + if let Some((primary_key, series)) = range.next() { + self.last_key = Some(primary_key.clone()); + let values = series.write().unwrap().compact(&self.metadata); + Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata))) + } else { + None + } + } +} + +/// Bucket holds a set of [Series] which alleviate lock contention between series. +struct Bucket { + region_metadata: RegionMetadataRef, + series: RwLock>>>, +} + +impl Bucket { + fn new(region_metadata: RegionMetadataRef) -> Self { + Self { + region_metadata, + series: Default::default(), + } + } + + /// Returns the series at given index. + /// Returns None if series not found. + #[inline] + fn get_series(&self, idx: usize) -> Option>> { + self.series.read().unwrap().get(idx).cloned() + } + + /// Adds series to bucket and returns the index inside the bucket. + #[inline] + fn add_series(&self, s: Arc>) -> usize { + let mut series = self.series.write().unwrap(); + let idx = series.len(); + series.push(s); + idx + } +} + +/// A `Series` holds a list of field values of some given primary key. +struct Series { + active: ValueBuilder, + frozen: Vec, +} + +impl Series { + fn new(region_metadata: &RegionMetadataRef) -> Self { + Self { + active: ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY), + frozen: vec![], + } + } + + /// Pushes a row of values into Series. + fn push(&mut self, ts: ValueRef, sequence: u64, op_type: OpType, values: Vec) { + self.active.push(ts, sequence, op_type as u8, values); + } + + /// Freezes the active part and push it to `frozen`. + fn freeze(&mut self, region_metadata: &RegionMetadataRef) { + if self.active.len() != 0 { + let mut builder = ValueBuilder::new(region_metadata, INITIAL_BUILDER_CAPACITY); + std::mem::swap(&mut self.active, &mut builder); + self.frozen.push(Values::from(builder)); + } + } + + /// Freezes active part to frozen part and compact frozen part to reduce memory fragmentation. + /// Returns the frozen and compacted values. + fn compact(&mut self, region_metadata: &RegionMetadataRef) -> Result { + self.freeze(region_metadata); + + let mut frozen = self.frozen.clone(); + let values = if frozen.len() == 1 { + frozen.pop().unwrap() + } else { + // TODO(hl): We should keep track of min/max timestamps for each values and avoid + // cloning and sorting when values do not overlap with each other. + + let total_len: usize = frozen.iter().map(|v| v.timestamp.len()).sum(); + let mut builder = ValueBuilder::new(region_metadata, total_len); + + for v in frozen { + let len = v.timestamp.len(); + builder + .timestamp + .extend_slice_of(&*v.timestamp, 0, len) + .context(CompactValuesSnafu)?; + builder + .sequence + .extend_slice_of(&*v.sequence, 0, len) + .context(CompactValuesSnafu)?; + + builder + .op_type + .extend_slice_of(&*v.op_type, 0, len) + .context(CompactValuesSnafu)?; + + for (idx, f) in v.fields.iter().enumerate() { + builder.fields[idx] + .extend_slice_of(&**f, 0, len) + .context(CompactValuesSnafu)?; + } + } + + let values = Values::from(builder); + self.frozen = vec![values.clone()]; + values + }; + Ok(values) + } +} + +/// `ValueBuilder` holds all the vector builders for field columns. +struct ValueBuilder { + timestamp: Box, + sequence: UInt64VectorBuilder, + op_type: UInt8VectorBuilder, + fields: Vec>, +} + +impl ValueBuilder { + fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self { + let timestamp = region_metadata + .time_index_column() + .column_schema + .data_type + .create_mutable_vector(capacity); + let sequence = UInt64VectorBuilder::with_capacity(capacity); + let op_type = UInt8VectorBuilder::with_capacity(capacity); + + let fields = region_metadata + .field_columns() + .map(|c| c.column_schema.data_type.create_mutable_vector(capacity)) + .collect(); + + Self { + timestamp, + sequence, + op_type, + fields, + } + } + + /// Pushes a new row to `ValueBuilder`. + /// We don't need primary keys since they've already be encoded. + fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec) { + debug_assert_eq!(fields.len(), self.fields.len()); + self.timestamp.push_value_ref(ts); + self.sequence.push_value_ref(ValueRef::UInt64(sequence)); + self.op_type.push_value_ref(ValueRef::UInt8(op_type)); + for (idx, field_value) in fields.into_iter().enumerate() { + self.fields[idx].push_value_ref(field_value); + } + } + + /// Returns the length of [ValueBuilder] + fn len(&self) -> usize { + let timestamp_len = self.timestamp.len(); + debug_assert_eq!(timestamp_len, self.op_type.len()); + debug_assert_eq!(timestamp_len, self.sequence.len()); + timestamp_len + } +} + +/// [Values] holds an immutable vectors of field columns, including `sequence` and `op_typee`. +#[derive(Clone)] +struct Values { + timestamp: VectorRef, + sequence: Arc, + op_type: Arc, + fields: Vec, +} + +impl Values { + /// Sorts values in place by `timestamp, sequence, op_type`. + fn sort_in_place(&mut self) -> Result<()> { + let mut arrays = Vec::with_capacity(3 + self.fields.len()); + arrays.push(self.timestamp.to_arrow_array()); + arrays.push(self.sequence.to_arrow_array()); + arrays.push(self.op_type.to_arrow_array()); + arrays.extend(self.fields.iter().map(|f| f.to_arrow_array())); + + // only sort by timestamp and sequence. + let fields = arrays + .iter() + .take(2) + .map(|v| arrow::row::SortField::new(v.data_type().clone())) + .collect(); + + let mut converter = RowConverter::new(fields).context(SortValuesSnafu)?; + let rows = converter + .convert_columns(&arrays[0..2]) + .context(SortValuesSnafu)?; + let mut sort_pairs = rows.iter().enumerate().collect::>(); + sort_pairs.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); + let indices = + arrow::array::UInt32Array::from_iter_values(sort_pairs.iter().map(|(i, _)| *i as u32)); + + let res = arrays + .into_iter() + .map(|arr| arrow::compute::take(&arr, &indices, None)) + .collect::>>() + .context(SortValuesSnafu)?; + + self.timestamp = Helper::try_into_vector(&res[0]).context(ConvertVectorSnafu)?; + self.sequence = + Arc::new(UInt64Vector::try_from_arrow_array(&res[1]).context(ConvertVectorSnafu)?); + self.op_type = + Arc::new(UInt8Vector::try_from_arrow_array(&res[2]).context(ConvertVectorSnafu)?); + self.fields = Helper::try_into_vectors(&res[3..]).context(ConvertVectorSnafu)?; + Ok(()) + } + + /// Converts [Values] to `Batch`. + pub fn to_batch(&self, primary_key: &[u8], metadata: &RegionMetadataRef) -> Result { + let builder = BatchBuilder::with_required_columns( + primary_key.to_vec(), + self.timestamp.clone(), + self.sequence.clone(), + self.op_type.clone(), + ); + + let fields = metadata + .field_columns() + .zip(self.fields.iter()) + .map(|(c, f)| BatchColumn { + column_id: c.column_id, + data: f.clone(), + }) + .collect(); + + builder.with_fields(fields).build() + } +} + +impl From for Values { + fn from(mut value: ValueBuilder) -> Self { + let fields = value + .fields + .iter_mut() + .map(|v| v.to_vector()) + .collect::>(); + let sequence = Arc::new(value.sequence.finish()); + let op_type = Arc::new(value.op_type.finish()); + let timestamp = value.timestamp.to_vector(); + + if cfg!(debug_assertions) { + debug_assert_eq!(timestamp.len(), sequence.len()); + debug_assert_eq!(timestamp.len(), op_type.len()); + for field in &fields { + debug_assert_eq!(timestamp.len(), field.len()); + } + } + + Self { + timestamp, + sequence, + op_type, + fields, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use api::helper::ColumnDataTypeWrapper; + use api::v1::value::ValueData; + use api::v1::{Row, Rows, SemanticType}; + use common_time::Timestamp; + use datatypes::prelude::{ConcreteDataType, ScalarVector}; + use datatypes::schema::ColumnSchema; + use datatypes::value::{OrderedFloat, Value}; + use datatypes::vectors::{ + Float32Vector, Float64Vector, Int64Vector, TimestampMillisecondVector, + }; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + fn schema_for_test() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("k1", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 4, + }) + .primary_key(vec![0, 1]); + let region_metadata = builder.build().unwrap(); + Arc::new(region_metadata) + } + + fn ts_value_ref(val: i64) -> ValueRef<'static> { + ValueRef::Timestamp(Timestamp::new_millisecond(val)) + } + + fn field_value_ref(v0: i64, v1: f64) -> Vec> { + vec![ValueRef::Int64(v0), ValueRef::Float64(OrderedFloat(v1))] + } + + fn check_values(values: Values, expect: &[(i64, u64, u8, i64, f64)]) { + let ts = values + .timestamp + .as_any() + .downcast_ref::() + .unwrap(); + + let v0 = values.fields[0] + .as_any() + .downcast_ref::() + .unwrap(); + let v1 = values.fields[1] + .as_any() + .downcast_ref::() + .unwrap(); + let read = ts + .iter_data() + .zip(values.sequence.iter_data()) + .zip(values.op_type.iter_data()) + .zip(v0.iter_data()) + .zip(v1.iter_data()) + .map(|((((ts, sequence), op_type), v0), v1)| { + ( + ts.unwrap().0.value(), + sequence.unwrap(), + op_type.unwrap(), + v0.unwrap(), + v1.unwrap(), + ) + }) + .collect::>(); + assert_eq!(expect, &read); + } + + #[test] + fn test_series() { + let region_metadata = schema_for_test(); + let mut series = Series::new(®ion_metadata); + series.push(ts_value_ref(1), 0, OpType::Put, field_value_ref(1, 10.1)); + series.push(ts_value_ref(2), 0, OpType::Put, field_value_ref(2, 10.2)); + assert_eq!(2, series.active.timestamp.len()); + assert_eq!(0, series.frozen.len()); + + let values = series.compact(®ion_metadata).unwrap(); + check_values(values, &[(1, 0, 1, 1, 10.1), (2, 0, 1, 2, 10.2)]); + assert_eq!(0, series.active.timestamp.len()); + assert_eq!(1, series.frozen.len()); + } + + fn check_value(values: &Values, expect: Vec>) { + assert_eq!(values.sequence.len(), values.timestamp.len()); + assert_eq!(values.op_type.len(), values.timestamp.len()); + for f in &values.fields { + assert_eq!(f.len(), values.timestamp.len()); + } + + let mut rows = vec![]; + for idx in 0..values.timestamp.len() { + let mut row = Vec::with_capacity(values.fields.len() + 3); + row.push(values.timestamp.get(idx)); + row.push(values.sequence.get(idx)); + row.push(values.op_type.get(idx)); + row.extend(values.fields.iter().map(|f| f.get(idx))); + rows.push(row); + } + + assert_eq!(expect.len(), rows.len()); + for (idx, row) in rows.iter().enumerate() { + assert_eq!(&expect[idx], row); + } + } + + #[test] + fn test_values_sort() { + let timestamp = Arc::new(TimestampMillisecondVector::from_vec(vec![1, 2, 4, 3])); + let sequence = Arc::new(UInt64Vector::from_vec(vec![1, 1, 1, 0])); + let op_type = Arc::new(UInt8Vector::from_vec(vec![1, 1, 1, 1])); + + let fields = vec![Arc::new(Float32Vector::from_vec(vec![1.1, 2.1, 3.3, 4.2])) as Arc<_>]; + let mut values = Values { + timestamp: timestamp as Arc<_>, + sequence, + op_type, + fields, + }; + values.sort_in_place().unwrap(); + + check_value( + &values, + vec![ + vec![ + Value::Timestamp(Timestamp::new_millisecond(1)), + Value::UInt64(1), + Value::UInt8(1), + Value::Float32(OrderedFloat(1.1)), + ], + vec![ + Value::Timestamp(Timestamp::new_millisecond(2)), + Value::UInt64(1), + Value::UInt8(1), + Value::Float32(OrderedFloat(2.1)), + ], + vec![ + Value::Timestamp(Timestamp::new_millisecond(3)), + Value::UInt64(0), + Value::UInt8(1), + Value::Float32(OrderedFloat(4.2)), + ], + vec![ + Value::Timestamp(Timestamp::new_millisecond(4)), + Value::UInt64(1), + Value::UInt8(1), + Value::Float32(OrderedFloat(3.3)), + ], + ], + ) + } + + fn build_key_values(schema: &RegionMetadataRef, len: usize) -> KeyValues { + let column_schema = schema + .column_metadatas + .iter() + .map(|c| api::v1::ColumnSchema { + column_name: c.column_schema.name.clone(), + datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) + .unwrap() + .datatype() as i32, + semantic_type: c.semantic_type as i32, + }) + .collect(); + + let rows = (0..len) + .map(|i| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue(i.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::I64Value(i as i64)), + }, + api::v1::Value { + value_data: Some(ValueData::TsMillisecondValue(i as i64)), + }, + api::v1::Value { + value_data: Some(ValueData::I64Value(i as i64)), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(i as f64)), + }, + ], + }) + .collect(); + let mutation = api::v1::Mutation { + op_type: 1, + sequence: 0, + rows: Some(Rows { + schema: column_schema, + rows, + }), + }; + KeyValues::new(schema.as_ref(), mutation).unwrap() + } + + #[test] + fn test_series_set_concurrency() { + let schema = schema_for_test(); + let set = Arc::new(SeriesSet::new(schema.clone())); + + let concurrency = 32; + let pk_num = concurrency * 2; + let mut handles = Vec::with_capacity(concurrency); + for i in 0..concurrency { + let set = set.clone(); + let handle = std::thread::spawn(move || { + for j in i * 100..(i + 1) * 100 { + let pk = j % pk_num; + let primary_key = format!("pk-{}", pk).as_bytes().to_vec(); + let series = set.get_or_add_series(primary_key); + let mut guard = series.write().unwrap(); + guard.push( + ts_value_ref(j as i64), + j as u64, + OpType::Put, + field_value_ref(j as i64, j as f64), + ); + } + }); + handles.push(handle); + } + for h in handles { + h.join().unwrap(); + } + + let mut timestamps = Vec::with_capacity(concurrency * 100); + let mut sequences = Vec::with_capacity(concurrency * 100); + let mut op_types = Vec::with_capacity(concurrency * 100); + let mut v0 = Vec::with_capacity(concurrency * 100); + + for i in 0..pk_num { + let pk = format!("pk-{}", i).as_bytes().to_vec(); + let series = set.get_or_add_series(pk); + let mut guard = series.write().unwrap(); + let values = guard.compact(&schema).unwrap(); + timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64)); + sequences.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64)); + op_types.extend(values.op_type.iter_data().map(|v| v.unwrap())); + v0.extend( + values + .fields + .get(0) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .map(|v| v.unwrap()), + ); + } + + let expected_sequence = (0..(concurrency * 100) as i64).collect::>(); + assert_eq!( + expected_sequence, + sequences.iter().copied().collect::>() + ); + + op_types.iter().all(|op| *op == OpType::Put as u8); + assert_eq!( + expected_sequence, + timestamps.iter().copied().collect::>() + ); + + assert_eq!(timestamps, sequences); + assert_eq!(v0, timestamps); + } + + #[test] + fn test_memtable() { + common_telemetry::init_default_ut_logging(); + let schema = schema_for_test(); + let kvs = build_key_values(&schema, 100); + let memtable = TimeSeriesMemtable::new(schema, 42).unwrap(); + memtable.write(&kvs).unwrap(); + + let expected_ts = kvs + .iter() + .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) + .collect::>(); + + let iter = memtable.iter(ScanRequest::default()); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect::>(); + assert_eq!(expected_ts, read); + } +} diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index f755aec003..603d39286d 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -22,23 +22,22 @@ use datatypes::value::ValueRef; use memcomparable::{Deserializer, Serializer}; use paste::paste; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use crate::error; -use crate::error::{ - FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, RowLengthMismatchSnafu, - SerializeFieldSnafu, -}; +use crate::error::{FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu}; /// Row value encoder/decoder. pub trait RowCodec { /// Encodes rows to bytes. - fn encode<'a, I>(&self, rows: I) -> Result> + /// # Note + /// Ensure the length of row iterator matches the length of fields. + fn encode<'a, I>(&self, row: I) -> Result> where - I: Iterator]>; + I: Iterator>; /// Decode row values from bytes. - fn decode(&self, bytes: &[u8]) -> Result>>; + fn decode(&self, bytes: &[u8]) -> Result>; } pub struct SortField { @@ -209,48 +208,37 @@ impl McmpRowCodec { Self { fields } } + pub fn num_fields(&self) -> usize { + self.fields.len() + } + /// Estimated length for encoded bytes. - fn estimated_length(&self) -> usize { + pub fn estimated_size(&self) -> usize { self.fields.iter().map(|f| f.estimated_size()).sum() } } impl RowCodec for McmpRowCodec { - fn encode<'a, I>(&self, rows: I) -> Result> + fn encode<'a, I>(&self, row: I) -> Result> where - I: Iterator]>, + I: Iterator>, { - let mut bytes = Vec::with_capacity(self.estimated_length()); - let mut serializer = memcomparable::Serializer::new(&mut bytes); - - for row in rows { - ensure!( - row.len() == self.fields.len(), - RowLengthMismatchSnafu { - expect: self.fields.len(), - actual: row.len(), - } - ); - - for (value, field) in row.iter().zip(self.fields.iter()) { - field.serialize(&mut serializer, value)?; - } + let mut bytes = Vec::with_capacity(self.estimated_size()); + let mut serializer = Serializer::new(&mut bytes); + for (value, field) in row.zip(self.fields.iter()) { + field.serialize(&mut serializer, &value)?; } Ok(bytes) } - fn decode(&self, bytes: &[u8]) -> Result>> { - let mut deserializer = memcomparable::Deserializer::new(bytes); - let mut res = vec![]; - while deserializer.has_remaining() { - let mut values = Vec::with_capacity(self.fields.len()); - for f in &self.fields { - let value = f.deserialize(&mut deserializer)?; - values.push(value); - } - res.push(values); + fn decode(&self, bytes: &[u8]) -> Result> { + let mut deserializer = Deserializer::new(bytes); + let mut values = Vec::with_capacity(self.fields.len()); + for f in &self.fields { + let value = f.deserialize(&mut deserializer)?; + values.push(value); } - Ok(res) + Ok(values) } } @@ -262,7 +250,7 @@ mod tests { use super::*; - fn check_encode_and_decode(data_types: &[ConcreteDataType], rows: &[Vec]) { + fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec) { let encoder = McmpRowCodec::new( data_types .iter() @@ -270,19 +258,11 @@ mod tests { .collect::>(), ); - let value_ref = rows - .iter() - .map(|row| row.iter().map(|v| v.as_value_ref()).collect::>()) - .collect::>(); - let result = encoder - .encode(value_ref.iter().map(|r| r.as_slice())) - .unwrap(); - let decoded = encoder.decode(&result).unwrap(); - assert_eq!(value_ref.len(), decoded.len()); + let value_ref = row.iter().map(|v| v.as_value_ref()).collect::>(); - for (i, row) in rows.iter().enumerate() { - assert_eq!(row, decoded.get(i).unwrap() as &[Value]); - } + let result = encoder.encode(value_ref.iter().cloned()).unwrap(); + let decoded = encoder.decode(&result).unwrap(); + assert_eq!(decoded, row); } #[test] @@ -293,11 +273,10 @@ mod tests { ]); let values = [Value::String("abcdefgh".into()), Value::Int64(128)]; let value_ref = values.iter().map(|v| v.as_value_ref()).collect::>(); - let result = encoder.encode(std::iter::once(&value_ref as _)).unwrap(); + let result = encoder.encode(value_ref.iter().cloned()).unwrap(); let decoded = encoder.decode(&result).unwrap(); - assert_eq!(1, decoded.len()); - assert_eq!(&values, decoded.get(0).unwrap() as &[Value]); + assert_eq!(&values, &decoded as &[Value]); } #[test] @@ -307,10 +286,10 @@ mod tests { ConcreteDataType::timestamp_millisecond_datatype(), ConcreteDataType::int64_datatype(), ], - &[vec![ + vec![ Value::Timestamp(Timestamp::new_millisecond(42)), Value::Int64(43), - ]], + ], ); } @@ -321,10 +300,10 @@ mod tests { ConcreteDataType::binary_datatype(), ConcreteDataType::int64_datatype(), ], - &[vec![ + vec![ Value::Binary(Bytes::from("hello".as_bytes())), Value::Int64(43), - ]], + ], ); } @@ -332,12 +311,18 @@ mod tests { fn test_memcmp_string() { check_encode_and_decode( &[ConcreteDataType::string_datatype()], - &[ - vec![Value::String(StringBytes::from("hello"))], - vec![Value::Null], - vec![Value::String("".into())], - vec![Value::String("world".into())], - ], + vec![Value::String(StringBytes::from("hello"))], + ); + + check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]); + + check_encode_and_decode( + &[ConcreteDataType::string_datatype()], + vec![Value::String("".into())], + ); + check_encode_and_decode( + &[ConcreteDataType::string_datatype()], + vec![Value::String("world".into())], ); } @@ -348,7 +333,7 @@ mod tests { ConcreteDataType::string_datatype(), ConcreteDataType::int32_datatype(), ], - &[vec![Value::String(StringBytes::from("abcd")), Value::Null]], + vec![Value::String(StringBytes::from("abcd")), Value::Null], ) } @@ -360,19 +345,33 @@ mod tests { ConcreteDataType::int64_datatype(), ConcreteDataType::boolean_datatype(), ], - &[ - vec![ - Value::String("hello".into()), - Value::Int64(42), - Value::Boolean(false), - ], - vec![ - Value::String("world".into()), - Value::Int64(43), - Value::Boolean(true), - ], - vec![Value::Null, Value::Int64(43), Value::Boolean(true)], + vec![ + Value::String("hello".into()), + Value::Int64(42), + Value::Boolean(false), ], ); + + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::boolean_datatype(), + ], + vec![ + Value::String("world".into()), + Value::Int64(43), + Value::Boolean(true), + ], + ); + + check_encode_and_decode( + &[ + ConcreteDataType::string_datatype(), + ConcreteDataType::int64_datatype(), + ConcreteDataType::boolean_datatype(), + ], + vec![Value::Null, Value::Int64(43), Value::Boolean(true)], + ); } } diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index 31754cc2a1..e601da45a1 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -131,7 +131,7 @@ impl RegionSchema { } #[inline] - pub(crate) fn timestamp_index(&self) -> usize { + pub fn timestamp_index(&self) -> usize { self.store_schema.timestamp_index() } @@ -146,7 +146,7 @@ impl RegionSchema { } #[inline] - pub(crate) fn column_metadata(&self, idx: usize) -> &ColumnMetadata { + pub fn column_metadata(&self, idx: usize) -> &ColumnMetadata { self.columns.column_metadata(idx) } diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index e123563d80..7c5b75e869 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -169,6 +169,13 @@ impl RegionMetadata { .map(|index| &self.column_metadatas[index]) } + pub fn primary_key_columns(&self) -> impl Iterator { + // safety: RegionMetadata::validate ensures every primary key exists. + self.primary_key + .iter() + .map(|id| self.column_by_id(*id).unwrap()) + } + /// Returns all field columns. pub fn field_columns(&self) -> impl Iterator { self.column_metadatas