diff --git a/src/datatypes/src/macros.rs b/src/datatypes/src/macros.rs index bc8392628e..0f309b630a 100644 --- a/src/datatypes/src/macros.rs +++ b/src/datatypes/src/macros.rs @@ -40,7 +40,7 @@ macro_rules! for_all_primitive_types{ #[macro_export] macro_rules! with_match_primitive_type_id { - ($key_type:expr, | $_:tt $T:ident | $body:tt, $nbody:tt) => {{ + ($key_type:expr, | $_:tt $T:ident | $body:tt, $nbody:tt) => {{ macro_rules! __with_ty__ { ( $_ $T:ident ) => { $body diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index dee4b25364..8fec708cf8 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -1,7 +1,9 @@ use common_base::bytes::{Bytes, StringBytes}; -use ordered_float::OrderedFloat; +pub use ordered_float::OrderedFloat; use serde::{Serialize, Serializer}; +use crate::data_type::ConcreteDataType; + pub type OrderedF32 = OrderedFloat; pub type OrderedF64 = OrderedFloat; @@ -36,6 +38,38 @@ pub enum Value { DateTime(i64), } +impl Value { + /// Returns data type of the value. + /// + /// # Panics + /// Panics if the data type is not supported. + pub fn data_type(&self) -> ConcreteDataType { + match self { + Value::Null => ConcreteDataType::null_datatype(), + Value::Boolean(_) => ConcreteDataType::boolean_datatype(), + Value::UInt8(_) => ConcreteDataType::uint8_datatype(), + Value::UInt16(_) => ConcreteDataType::uint16_datatype(), + Value::UInt32(_) => ConcreteDataType::uint32_datatype(), + Value::UInt64(_) => ConcreteDataType::uint64_datatype(), + Value::Int8(_) => ConcreteDataType::int8_datatype(), + Value::Int16(_) => ConcreteDataType::int16_datatype(), + Value::Int32(_) => ConcreteDataType::int32_datatype(), + Value::Int64(_) => ConcreteDataType::int64_datatype(), + Value::Float32(_) => ConcreteDataType::float32_datatype(), + Value::Float64(_) => ConcreteDataType::float64_datatype(), + Value::String(_) => ConcreteDataType::string_datatype(), + Value::Binary(_) => ConcreteDataType::binary_datatype(), + Value::Date(_) | Value::DateTime(_) => { + unimplemented!("Unsupported data type of value {:?}", self) + } + } + } + + pub fn is_null(&self) -> bool { + matches!(self, Value::Null) + } +} + macro_rules! impl_from { ($Variant:ident, $Type:ident) => { impl From<$Type> for Value { @@ -180,6 +214,66 @@ mod tests { assert_eq!(Value::Binary(bytes.clone()), Value::from(bytes)); } + #[test] + fn test_value_datatype() { + assert_eq!( + ConcreteDataType::boolean_datatype(), + Value::Boolean(true).data_type() + ); + assert_eq!( + ConcreteDataType::uint8_datatype(), + Value::UInt8(u8::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::uint16_datatype(), + Value::UInt16(u16::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::uint16_datatype(), + Value::UInt16(u16::MAX).data_type() + ); + assert_eq!( + ConcreteDataType::uint32_datatype(), + Value::UInt32(u32::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::uint64_datatype(), + Value::UInt64(u64::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::int8_datatype(), + Value::Int8(i8::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::int16_datatype(), + Value::Int16(i16::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::int32_datatype(), + Value::Int32(i32::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::int64_datatype(), + Value::Int64(i64::MIN).data_type() + ); + assert_eq!( + ConcreteDataType::float32_datatype(), + Value::Float32(OrderedFloat(f32::MIN)).data_type(), + ); + assert_eq!( + ConcreteDataType::float64_datatype(), + Value::Float64(OrderedFloat(f64::MIN)).data_type(), + ); + assert_eq!( + ConcreteDataType::string_datatype(), + Value::String(StringBytes::from("hello")).data_type(), + ); + assert_eq!( + ConcreteDataType::binary_datatype(), + Value::Binary(Bytes::from(b"world".as_slice())).data_type() + ); + } + #[test] fn test_value_from_string() { let hello = "hello".to_string(); diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 89134df4e4..2624512739 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -1,5 +1,6 @@ pub mod binary; pub mod boolean; +mod builder; pub mod constant; mod helper; pub mod mutable; @@ -14,6 +15,7 @@ use arrow::array::ArrayRef; use arrow::bitmap::Bitmap; pub use binary::*; pub use boolean::*; +pub use builder::VectorBuilder; pub use constant::*; pub use helper::Helper; pub use mutable::MutableVector; @@ -26,11 +28,6 @@ use crate::data_type::ConcreteDataType; use crate::error::{BadArrayAccessSnafu, Result}; use crate::serialize::Serializable; use crate::value::Value; -pub use crate::vectors::{ - BinaryVector, BooleanVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, - Int64Vector, Int8Vector, NullVector, StringVector, UInt16Vector, UInt32Vector, UInt64Vector, - UInt8Vector, -}; #[derive(Debug, PartialEq)] pub enum Validity<'a> { diff --git a/src/datatypes/src/vectors/builder.rs b/src/datatypes/src/vectors/builder.rs new file mode 100644 index 0000000000..fc2c2bf87f --- /dev/null +++ b/src/datatypes/src/vectors/builder.rs @@ -0,0 +1,225 @@ +use std::sync::Arc; + +use crate::data_type::ConcreteDataType; +use crate::scalars::ScalarVectorBuilder; +use crate::value::Value; +use crate::vectors::{ + BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder, + Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, NullVector, + StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, + UInt8VectorBuilder, VectorRef, +}; + +pub enum VectorBuilder { + Null(usize), + + // Numeric types: + Boolean(BooleanVectorBuilder), + UInt8(UInt8VectorBuilder), + UInt16(UInt16VectorBuilder), + UInt32(UInt32VectorBuilder), + UInt64(UInt64VectorBuilder), + Int8(Int8VectorBuilder), + Int16(Int16VectorBuilder), + Int32(Int32VectorBuilder), + Int64(Int64VectorBuilder), + Float32(Float32VectorBuilder), + Float64(Float64VectorBuilder), + + // String types: + String(StringVectorBuilder), + Binary(BinaryVectorBuilder), +} + +impl VectorBuilder { + pub fn new(data_type: ConcreteDataType) -> VectorBuilder { + VectorBuilder::with_capacity(data_type, 0) + } + + pub fn with_capacity(data_type: ConcreteDataType, capacity: usize) -> VectorBuilder { + match data_type { + ConcreteDataType::Null(_) => VectorBuilder::Null(0), + ConcreteDataType::Boolean(_) => { + VectorBuilder::Boolean(BooleanVectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::UInt8(_) => { + VectorBuilder::UInt8(UInt8VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::UInt16(_) => { + VectorBuilder::UInt16(UInt16VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::UInt32(_) => { + VectorBuilder::UInt32(UInt32VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::UInt64(_) => { + VectorBuilder::UInt64(UInt64VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Int8(_) => { + VectorBuilder::Int8(Int8VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Int16(_) => { + VectorBuilder::Int16(Int16VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Int32(_) => { + VectorBuilder::Int32(Int32VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Int64(_) => { + VectorBuilder::Int64(Int64VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Float32(_) => { + VectorBuilder::Float32(Float32VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Float64(_) => { + VectorBuilder::Float64(Float64VectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::String(_) => { + VectorBuilder::String(StringVectorBuilder::with_capacity(capacity)) + } + ConcreteDataType::Binary(_) => { + VectorBuilder::Binary(BinaryVectorBuilder::with_capacity(capacity)) + } + } + } + + pub fn push(&mut self, value: &Value) { + if value.is_null() { + self.push_null(); + return; + } + + match (self, value) { + (VectorBuilder::Boolean(b), Value::Boolean(v)) => b.push(Some(*v)), + (VectorBuilder::UInt8(b), Value::UInt8(v)) => b.push(Some(*v)), + (VectorBuilder::UInt16(b), Value::UInt16(v)) => b.push(Some(*v)), + (VectorBuilder::UInt32(b), Value::UInt32(v)) => b.push(Some(*v)), + (VectorBuilder::UInt64(b), Value::UInt64(v)) => b.push(Some(*v)), + (VectorBuilder::Int8(b), Value::Int8(v)) => b.push(Some(*v)), + (VectorBuilder::Int16(b), Value::Int16(v)) => b.push(Some(*v)), + (VectorBuilder::Int32(b), Value::Int32(v)) => b.push(Some(*v)), + (VectorBuilder::Int64(b), Value::Int64(v)) => b.push(Some(*v)), + (VectorBuilder::Float32(b), Value::Float32(v)) => b.push(Some(v.into_inner())), + (VectorBuilder::Float64(b), Value::Float64(v)) => b.push(Some(v.into_inner())), + (VectorBuilder::String(b), Value::String(v)) => b.push(Some(v.as_utf8())), + (VectorBuilder::Binary(b), Value::Binary(v)) => b.push(Some(v)), + _ => panic!("Value {:?} does not match builder type", value), + } + } + + pub fn push_null(&mut self) { + match self { + VectorBuilder::Null(v) => *v += 1, + VectorBuilder::Boolean(b) => b.push(None), + VectorBuilder::UInt8(b) => b.push(None), + VectorBuilder::UInt16(b) => b.push(None), + VectorBuilder::UInt32(b) => b.push(None), + VectorBuilder::UInt64(b) => b.push(None), + VectorBuilder::Int8(b) => b.push(None), + VectorBuilder::Int16(b) => b.push(None), + VectorBuilder::Int32(b) => b.push(None), + VectorBuilder::Int64(b) => b.push(None), + VectorBuilder::Float32(b) => b.push(None), + VectorBuilder::Float64(b) => b.push(None), + VectorBuilder::String(b) => b.push(None), + VectorBuilder::Binary(b) => b.push(None), + } + } + + pub fn finish(&mut self) -> VectorRef { + match self { + VectorBuilder::Null(v) => Arc::new(NullVector::new(*v)), + VectorBuilder::Boolean(b) => Arc::new(b.finish()), + VectorBuilder::UInt8(b) => Arc::new(b.finish()), + VectorBuilder::UInt16(b) => Arc::new(b.finish()), + VectorBuilder::UInt32(b) => Arc::new(b.finish()), + VectorBuilder::UInt64(b) => Arc::new(b.finish()), + VectorBuilder::Int8(b) => Arc::new(b.finish()), + VectorBuilder::Int16(b) => Arc::new(b.finish()), + VectorBuilder::Int32(b) => Arc::new(b.finish()), + VectorBuilder::Int64(b) => Arc::new(b.finish()), + VectorBuilder::Float32(b) => Arc::new(b.finish()), + VectorBuilder::Float64(b) => Arc::new(b.finish()), + VectorBuilder::String(b) => Arc::new(b.finish()), + VectorBuilder::Binary(b) => Arc::new(b.finish()), + } + } +} + +#[cfg(test)] +mod tests { + use ordered_float::OrderedFloat; + + use super::*; + + macro_rules! impl_integer_builder_test { + ($Type: ident, $datatype: ident) => { + let mut builder = VectorBuilder::with_capacity(ConcreteDataType::$datatype(), 10); + for i in 0..10 { + builder.push(&Value::$Type(i)); + } + let vector = builder.finish(); + + for i in 0..10 { + assert_eq!(Value::$Type(i), vector.get(i as usize)); + } + + let mut builder = VectorBuilder::new(ConcreteDataType::$datatype()); + builder.push(&Value::Null); + builder.push(&Value::$Type(100)); + let vector = builder.finish(); + + assert!(vector.is_null(0)); + assert_eq!(Value::$Type(100), vector.get(1)); + }; + } + + #[test] + fn test_null_vector_builder() { + let mut builder = VectorBuilder::new(ConcreteDataType::null_datatype()); + builder.push(&Value::Null); + let vector = builder.finish(); + assert!(vector.is_null(0)); + } + + #[test] + fn test_integer_vector_builder() { + impl_integer_builder_test!(UInt8, uint8_datatype); + impl_integer_builder_test!(UInt16, uint16_datatype); + impl_integer_builder_test!(UInt32, uint32_datatype); + impl_integer_builder_test!(UInt64, uint64_datatype); + impl_integer_builder_test!(Int8, int8_datatype); + impl_integer_builder_test!(Int16, int16_datatype); + impl_integer_builder_test!(Int32, int32_datatype); + impl_integer_builder_test!(Int64, int64_datatype); + } + + #[test] + fn test_float_vector_builder() { + let mut builder = VectorBuilder::new(ConcreteDataType::float32_datatype()); + builder.push(&Value::Float32(OrderedFloat(1.0))); + let vector = builder.finish(); + assert_eq!(Value::Float32(OrderedFloat(1.0)), vector.get(0)); + + let mut builder = VectorBuilder::new(ConcreteDataType::float64_datatype()); + builder.push(&Value::Float64(OrderedFloat(2.0))); + let vector = builder.finish(); + assert_eq!(Value::Float64(OrderedFloat(2.0)), vector.get(0)); + } + + #[test] + fn test_binary_vector_builder() { + let hello: &[u8] = b"hello"; + let mut builder = VectorBuilder::new(ConcreteDataType::binary_datatype()); + builder.push(&Value::Binary(hello.into())); + let vector = builder.finish(); + assert_eq!(Value::Binary(hello.into()), vector.get(0)); + } + + #[test] + fn test_string_vector_builder() { + let hello = "hello"; + let mut builder = VectorBuilder::new(ConcreteDataType::string_datatype()); + builder.push(&Value::String(hello.into())); + let vector = builder.finish(); + assert_eq!(Value::String(hello.into()), vector.get(0)); + } +} diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 165a3626d4..08f64c4fda 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -209,6 +209,19 @@ impl PrimitiveVectorBuilder { } } +pub type UInt8VectorBuilder = PrimitiveVectorBuilder; +pub type UInt16VectorBuilder = PrimitiveVectorBuilder; +pub type UInt32VectorBuilder = PrimitiveVectorBuilder; +pub type UInt64VectorBuilder = PrimitiveVectorBuilder; + +pub type Int8VectorBuilder = PrimitiveVectorBuilder; +pub type Int16VectorBuilder = PrimitiveVectorBuilder; +pub type Int32VectorBuilder = PrimitiveVectorBuilder; +pub type Int64VectorBuilder = PrimitiveVectorBuilder; + +pub type Float32VectorBuilder = PrimitiveVectorBuilder; +pub type Float64VectorBuilder = PrimitiveVectorBuilder; + impl MutableVector for PrimitiveVectorBuilder { fn data_type(&self) -> ConcreteDataType { T::build_data_type() diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index fc54ad9def..e0e88d67a7 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -2,7 +2,7 @@ mod engine; mod error; -mod memtable; +pub mod memtable; pub mod metadata; mod region; mod region_writer; diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index adb670a6db..d828398adf 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -1,11 +1,14 @@ mod btree; mod inserter; mod schema; +#[cfg(test)] +mod tests; use std::mem; use std::sync::Arc; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef}; +use snafu::Snafu; use store_api::storage::{SequenceNumber, ValueType}; use crate::error::Result; @@ -20,15 +23,66 @@ pub trait Memtable: Send + Sync { /// Write key/values to the memtable. /// /// # Panics - /// Panic if the schema of key/value differs from memtable's schema. + /// Panics if the schema of key/value differs from memtable's schema. fn write(&self, kvs: &KeyValues) -> Result<()>; + /// Iterates the memtable. + // TODO(yingwen): Consider passing a projector (does column projection). + fn iter(&self, ctx: IterContext) -> Result; + /// Returns the estimated bytes allocated by this memtable from heap. fn bytes_allocated(&self) -> usize; } pub type MemtableRef = Arc; +/// Context for iterating memtable. +#[derive(Debug, Clone)] +pub struct IterContext { + /// The suggested batch size of the iterator. + pub batch_size: usize, +} + +impl Default for IterContext { + fn default() -> Self { + Self { batch_size: 256 } + } +} + +/// The ordering of the iterator output. +#[derive(Debug, PartialEq)] +pub enum RowOrdering { + /// The output rows are unordered. + Unordered, + + /// The output rows are ordered by key. + Key, +} + +pub struct Batch { + pub keys: Vec, + pub sequences: UInt64Vector, + pub value_types: UInt8Vector, + pub values: Vec, +} + +/// Iterator of memtable. +pub trait BatchIterator: Send { + /// Returns the schema of this iterator. + fn schema(&self) -> &MemtableSchema; + + /// Returns the ordering of the output rows from this iterator. + fn ordering(&self) -> RowOrdering; + + /// Fetch next batch from the memtable. + /// + /// # Panics + /// Panics if the iterator has already been exhausted. + fn next(&mut self) -> Result>; +} + +pub type BatchIteratorPtr = Box; + pub trait MemtableBuilder: Send + Sync { fn build(&self, schema: MemtableSchema) -> MemtableRef; } @@ -54,12 +108,14 @@ impl KeyValues { self.keys.clear(); self.values.clear(); } -} -impl KeyValues { pub fn len(&self) -> usize { self.keys.first().map(|v| v.len()).unwrap_or_default() } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } pub struct DefaultMemtableBuilder {} @@ -70,6 +126,10 @@ impl MemtableBuilder for DefaultMemtableBuilder { } } +#[derive(Debug, Snafu)] +#[snafu(display("Fail to switch memtable"))] +pub struct SwitchError; + pub struct MemtableSet { mem: MemtableRef, // TODO(yingwen): Support multiple immutable memtables. @@ -86,9 +146,12 @@ impl MemtableSet { } /// Switch mutable memtable to immutable memtable, returns the old mutable memtable if success. - pub fn _switch_memtable(&mut self, mem: &MemtableRef) -> std::result::Result { + pub fn _switch_memtable( + &mut self, + mem: &MemtableRef, + ) -> std::result::Result { match &self._immem { - Some(_) => Err(()), + Some(_) => SwitchSnafu {}.fail(), None => { let old_mem = mem::replace(&mut self.mem, mem.clone()); self._immem = Some(old_mem.clone()); diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 4c1382837f..4d3464a4d8 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -1,26 +1,34 @@ use std::cmp::Ordering; -use std::collections::BTreeMap; -use std::sync::RwLock; +use std::collections::{btree_map, BTreeMap}; +use std::ops::Bound; +use std::sync::{Arc, RwLock}; +use datatypes::prelude::*; use datatypes::value::Value; +use datatypes::vectors::{UInt64VectorBuilder, UInt8VectorBuilder, VectorBuilder}; use store_api::storage::{SequenceNumber, ValueType}; use crate::error::Result; -use crate::memtable::{KeyValues, Memtable, MemtableSchema}; +use crate::memtable::{ + Batch, BatchIterator, BatchIteratorPtr, IterContext, KeyValues, Memtable, MemtableSchema, + RowOrdering, +}; + +type RwLockMap = RwLock>; /// A simple memtable implementation based on std's [`BTreeMap`]. /// -/// Mainly for test purpose. +/// Mainly for test purpose, don't use in production. pub struct BTreeMemtable { schema: MemtableSchema, - map: RwLock>, + map: Arc, } impl BTreeMemtable { pub fn new(schema: MemtableSchema) -> BTreeMemtable { BTreeMemtable { schema, - map: RwLock::new(BTreeMap::new()), + map: Arc::new(RwLock::new(BTreeMap::new())), } } } @@ -34,18 +42,133 @@ impl Memtable for BTreeMemtable { let mut map = self.map.write().unwrap(); let iter_row = IterRow::new(kvs); - for (row_key, row_value) in iter_row { - map.insert(row_key, row_value); + for (inner_key, row_value) in iter_row { + map.insert(inner_key, row_value); } Ok(()) } + fn iter(&self, ctx: IterContext) -> Result { + assert!(ctx.batch_size > 0); + + let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone()); + + Ok(Box::new(iter)) + } + fn bytes_allocated(&self) -> usize { unimplemented!() } } +struct BTreeIterator { + ctx: IterContext, + schema: MemtableSchema, + map: Arc, + last_key: Option, +} + +impl BatchIterator for BTreeIterator { + fn schema(&self) -> &MemtableSchema { + &self.schema + } + + fn ordering(&self) -> RowOrdering { + RowOrdering::Key + } + + fn next(&mut self) -> Result> { + Ok(self.next_batch()) + } +} + +impl BTreeIterator { + fn new(ctx: IterContext, schema: MemtableSchema, map: Arc) -> BTreeIterator { + BTreeIterator { + ctx, + schema, + map, + last_key: None, + } + } + + fn next_batch(&mut self) -> Option { + let map = self.map.read().unwrap(); + let iter = if let Some(last_key) = &self.last_key { + map.range((Bound::Excluded(last_key), Bound::Unbounded)) + } else { + map.range(..) + }; + let iter = MapIterWrapper::new(iter); + + let mut keys = Vec::with_capacity(self.ctx.batch_size); + let mut sequences = UInt64VectorBuilder::with_capacity(self.ctx.batch_size); + let mut value_types = UInt8VectorBuilder::with_capacity(self.ctx.batch_size); + let mut values = Vec::with_capacity(self.ctx.batch_size); + for (inner_key, row_value) in iter.take(self.ctx.batch_size) { + keys.push(inner_key); + sequences.push(Some(inner_key.sequence)); + value_types.push(Some(inner_key.value_type.as_u8())); + values.push(row_value); + } + + if keys.is_empty() { + return None; + } + self.last_key = keys.last().map(|k| (*k).clone()); + + Some(Batch { + keys: rows_to_vectors(keys.as_slice()), + sequences: sequences.finish(), + value_types: value_types.finish(), + values: rows_to_vectors(values.as_slice()), + }) + } +} + +/// `MapIterWrapper` removes same user key with elder sequence. +struct MapIterWrapper<'a, InnerKey, RowValue> { + iter: btree_map::Range<'a, InnerKey, RowValue>, + prev_key: Option, +} + +impl<'a> MapIterWrapper<'a, InnerKey, RowValue> { + fn new( + iter: btree_map::Range<'a, InnerKey, RowValue>, + ) -> MapIterWrapper<'a, InnerKey, RowValue> { + MapIterWrapper { + iter, + prev_key: None, + } + } +} + +impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> { + type Item = (&'a InnerKey, &'a RowValue); + + fn next(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> { + let (mut current_key, mut current_value) = self.iter.next()?; + if self.prev_key.is_none() { + self.prev_key = Some(current_key.clone()); + return Some((current_key, current_value)); + } + + let prev_key = self.prev_key.take().unwrap(); + while prev_key.is_row_key_equal(current_key) { + if let Some((next_key, next_value)) = self.iter.next() { + (current_key, current_value) = (next_key, next_value); + } else { + return None; + } + } + + self.prev_key = Some(current_key.clone()); + + Some((current_key, current_value)) + } +} + struct IterRow<'a> { kvs: &'a KeyValues, index: usize, @@ -61,22 +184,22 @@ impl<'a> IterRow<'a> { } } - fn fetch_row(&mut self) -> (RowKey, RowValue) { - let keys = self + fn fetch_row(&mut self) -> (InnerKey, RowValue) { + let row_key = self .kvs .keys .iter() .map(|vector| vector.get(self.index)) .collect(); - let row_key = RowKey { - keys, + let inner_key = InnerKey { + row_key, sequence: self.kvs.sequence, index_in_batch: self.kvs.start_index_in_batch + self.index, value_type: self.kvs.value_type, }; let row_value = RowValue { - _values: self + values: self .kvs .values .iter() @@ -84,14 +207,16 @@ impl<'a> IterRow<'a> { .collect(), }; - (row_key, row_value) + self.index += 1; + + (inner_key, row_value) } } impl<'a> Iterator for IterRow<'a> { - type Item = (RowKey, RowValue); + type Item = (InnerKey, RowValue); - fn next(&mut self) -> Option<(RowKey, RowValue)> { + fn next(&mut self) -> Option<(InnerKey, RowValue)> { if self.index >= self.len { return None; } @@ -104,33 +229,99 @@ impl<'a> Iterator for IterRow<'a> { } } -// TODO(yingwen): Actually the version and timestamp may order desc. -#[derive(PartialEq, Eq)] -struct RowKey { - keys: Vec, +#[derive(Clone, Debug, PartialEq, Eq)] +struct InnerKey { + row_key: Vec, sequence: SequenceNumber, index_in_batch: usize, value_type: ValueType, } -impl Ord for RowKey { - fn cmp(&self, other: &RowKey) -> Ordering { - // Order by (keys asc, sequence desc, index_in_batch desc, value type desc), though (key, +impl Ord for InnerKey { + fn cmp(&self, other: &InnerKey) -> Ordering { + // Order by (row_key asc, sequence desc, index_in_batch desc, value type desc), though (key, // sequence, index_in_batch) should be enough to disambiguate. - self.keys - .cmp(&other.keys) + self.row_key + .cmp(&other.row_key) .then_with(|| other.sequence.cmp(&self.sequence)) .then_with(|| other.index_in_batch.cmp(&self.index_in_batch)) .then_with(|| other.value_type.cmp(&self.value_type)) } } -impl PartialOrd for RowKey { - fn partial_cmp(&self, other: &RowKey) -> Option { +impl PartialOrd for InnerKey { + fn partial_cmp(&self, other: &InnerKey) -> Option { Some(self.cmp(other)) } } -struct RowValue { - _values: Vec, +impl InnerKey { + fn is_row_key_equal(&self, other: &InnerKey) -> bool { + self.row_key == other.row_key + } +} + +#[derive(Clone, Debug)] +struct RowValue { + values: Vec, +} + +trait RowsProvider { + fn row_num(&self) -> usize; + + fn column_num(&self) -> usize { + self.row_by_index(0).len() + } + + fn is_empty(&self) -> bool { + self.row_num() == 0 + } + + fn row_by_index(&self, idx: usize) -> &Vec; +} + +impl<'a> RowsProvider for &'a [&InnerKey] { + fn row_num(&self) -> usize { + self.len() + } + + fn row_by_index(&self, idx: usize) -> &Vec { + &self[idx].row_key + } +} + +impl<'a> RowsProvider for &'a [&RowValue] { + fn row_num(&self) -> usize { + self.len() + } + + fn row_by_index(&self, idx: usize) -> &Vec { + &self[idx].values + } +} + +fn rows_to_vectors(provider: T) -> Vec { + if provider.is_empty() { + return Vec::new(); + } + + let column_num = provider.column_num(); + let row_num = provider.row_num(); + let mut builders = Vec::with_capacity(column_num); + for v in provider.row_by_index(0) { + builders.push(VectorBuilder::with_capacity(v.data_type(), row_num)); + } + + let mut vectors = Vec::with_capacity(column_num); + for (col_idx, builder) in builders.iter_mut().enumerate() { + for row_idx in 0..row_num { + let row = provider.row_by_index(row_idx); + let value = &row[col_idx]; + builder.push(value); + } + + vectors.push(builder.finish()); + } + + vectors } diff --git a/src/storage/src/memtable/schema.rs b/src/storage/src/memtable/schema.rs index 2b2dc204e6..e23c210de0 100644 --- a/src/storage/src/memtable/schema.rs +++ b/src/storage/src/memtable/schema.rs @@ -1,5 +1,6 @@ use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef}; +#[derive(Clone, Debug, PartialEq)] pub struct MemtableSchema { columns_row_key: ColumnsRowKeyMetadataRef, } diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs new file mode 100644 index 0000000000..13349a9449 --- /dev/null +++ b/src/storage/src/memtable/tests.rs @@ -0,0 +1,304 @@ +use datatypes::prelude::*; +use datatypes::type_id::LogicalTypeId; +use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder}; + +use super::*; +use crate::metadata::RegionMetadata; +use crate::test_util::descriptor_util::RegionDescBuilder; + +// Schema for testing memtable: +// - key: Int64(timestamp), UInt64(version), +// - value: UInt64 +fn schema_for_test() -> MemtableSchema { + // Just build a region desc and use its columns_row_key metadata. + let desc = RegionDescBuilder::new("test") + .push_value_column(("v1", LogicalTypeId::UInt64, true)) + .build(); + let metadata: RegionMetadata = desc.try_into().unwrap(); + + MemtableSchema::new(metadata.columns_row_key) +} + +fn kvs_for_test_with_index( + sequence: SequenceNumber, + value_type: ValueType, + start_index_in_batch: usize, + keys: &[(i64, u64)], + values: &[Option], +) -> KeyValues { + assert_eq!(keys.len(), values.len()); + + let mut key_builders = ( + Int64VectorBuilder::with_capacity(keys.len()), + UInt64VectorBuilder::with_capacity(keys.len()), + ); + for key in keys { + key_builders.0.push(Some(key.0)); + key_builders.1.push(Some(key.1)); + } + let row_keys = vec![ + Arc::new(key_builders.0.finish()) as _, + Arc::new(key_builders.1.finish()) as _, + ]; + + let mut value_builder = UInt64VectorBuilder::with_capacity(values.len()); + for value in values { + value_builder.push(*value); + } + let row_values = vec![Arc::new(value_builder.finish()) as _]; + + let kvs = KeyValues { + sequence, + value_type, + start_index_in_batch, + keys: row_keys, + values: row_values, + }; + + assert_eq!(keys.len(), kvs.len()); + assert_eq!(keys.is_empty(), kvs.is_empty()); + + kvs +} + +fn kvs_for_test( + sequence: SequenceNumber, + value_type: ValueType, + keys: &[(i64, u64)], + values: &[Option], +) -> KeyValues { + kvs_for_test_with_index(sequence, value_type, 0, keys, values) +} + +fn write_kvs( + memtable: &dyn Memtable, + sequence: SequenceNumber, + value_type: ValueType, + keys: &[(i64, u64)], + values: &[Option], +) { + let kvs = kvs_for_test(sequence, value_type, keys, values); + + memtable.write(&kvs).unwrap(); +} + +fn check_batch_valid(batch: &Batch) { + assert_eq!(2, batch.keys.len()); + assert_eq!(1, batch.values.len()); + let row_num = batch.keys[0].len(); + assert_eq!(row_num, batch.keys[1].len()); + assert_eq!(row_num, batch.sequences.len()); + assert_eq!(row_num, batch.value_types.len()); + assert_eq!(row_num, batch.values[0].len()); +} + +fn check_iter_content( + iter: &mut dyn BatchIterator, + keys: &[(i64, u64)], + sequences: &[u64], + value_types: &[ValueType], + values: &[Option], +) { + let mut index = 0; + while let Some(batch) = iter.next().unwrap() { + check_batch_valid(&batch); + + let row_num = batch.keys[0].len(); + for i in 0..row_num { + let (k0, k1) = (batch.keys[0].get(i), batch.keys[1].get(i)); + let sequence = batch.sequences.get_data(i).unwrap(); + let value_type = batch.value_types.get_data(i).unwrap(); + let v = batch.values[0].get(i); + + assert_eq!(Value::from(keys[index].0), k0); + assert_eq!(Value::from(keys[index].1), k1); + assert_eq!(sequences[index], sequence); + assert_eq!(value_types[index].as_u8(), value_type); + assert_eq!(Value::from(values[index]), v); + + index += 1; + } + } + + assert_eq!(keys.len(), index); +} + +// TODO(yingwen): Check size of the returned batch. + +struct MemtableTester { + schema: MemtableSchema, + builders: Vec, +} + +impl Default for MemtableTester { + fn default() -> MemtableTester { + MemtableTester::new() + } +} + +impl MemtableTester { + fn new() -> MemtableTester { + let schema = schema_for_test(); + let builders = vec![Arc::new(DefaultMemtableBuilder {}) as _]; + + MemtableTester { schema, builders } + } + + fn new_memtables(&self) -> Vec { + self.builders + .iter() + .map(|b| b.build(self.schema.clone())) + .collect() + } + + fn run_testcase(&self, testcase: F) + where + F: Fn(TestContext), + { + for memtable in self.new_memtables() { + let test_ctx = TestContext { + schema: self.schema.clone(), + memtable, + }; + + testcase(test_ctx); + } + } +} + +struct TestContext { + schema: MemtableSchema, + memtable: MemtableRef, +} + +fn write_iter_memtable_case(ctx: &TestContext) { + // Test iterating an empty memtable. + let mut iter = ctx.memtable.iter(IterContext::default()).unwrap(); + assert!(iter.next().unwrap().is_none()); + + // Init test data. + write_kvs( + &*ctx.memtable, + 10, // sequence + ValueType::Put, + &[ + (1000, 1), + (1000, 2), + (2002, 1), + (2003, 1), + (2003, 5), + (1001, 1), + ], // keys + &[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values + ); + write_kvs( + &*ctx.memtable, + 11, // sequence + ValueType::Put, + &[(1002, 1), (1003, 1), (1004, 1)], // keys + &[None, Some(5), None], // values + ); + + let batch_sizes = [1, 4, 8, 256]; + for batch_size in batch_sizes { + let iter_ctx = IterContext { batch_size }; + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + assert_eq!(ctx.schema, *iter.schema()); + assert_eq!(RowOrdering::Key, iter.ordering()); + + check_iter_content( + &mut *iter, + &[ + (1000, 1), + (1000, 2), + (1001, 1), + (1002, 1), + (1003, 1), + (1004, 1), + (2002, 1), + (2003, 1), + (2003, 5), + ], // keys + &[10, 10, 10, 11, 11, 11, 10, 10, 10], // sequences + &[ + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ValueType::Put, + ], // value types + &[ + Some(1), + Some(2), + Some(3), + None, + Some(5), + None, + Some(7), + Some(8), + Some(9), + ], // values + ); + } +} + +#[test] +fn test_write_iter_memtable() { + let tester = MemtableTester::default(); + tester.run_testcase(|ctx| { + write_iter_memtable_case(&ctx); + }); +} + +fn check_iter_batch_size(iter: &mut dyn BatchIterator, total: usize, batch_size: usize) { + let mut remains = total; + while let Some(batch) = iter.next().unwrap() { + check_batch_valid(&batch); + + let row_num = batch.keys[0].len(); + if remains >= batch_size { + assert_eq!(batch_size, row_num); + remains -= batch_size; + } else { + assert_eq!(remains, row_num); + remains = 0; + } + } + + assert_eq!(0, remains); +} + +#[test] +fn test_iter_batch_size() { + let tester = MemtableTester::default(); + tester.run_testcase(|ctx| { + write_kvs( + &*ctx.memtable, + 10, // sequence + ValueType::Put, + &[ + (1000, 1), + (1000, 2), + (1001, 1), + (2002, 1), + (2003, 1), + (2003, 5), + ], // keys + &[Some(1), Some(2), Some(3), Some(4), None, None], // values + ); + + let total = 6; + // Batch size [less than, equal to, greater than] total + let batch_sizes = [1, 6, 8]; + for batch_size in batch_sizes { + let iter_ctx = IterContext { batch_size }; + + let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + check_iter_batch_size(&mut *iter, total, batch_size); + } + }); +} diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index d222d7b14a..8bab1c513c 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -48,7 +48,7 @@ pub type VersionNumber = u32; // TODO(yingwen): Make some fields of metadata private. /// In memory metadata of region. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RegionMetadata { /// Schema of the region. /// @@ -66,13 +66,13 @@ pub struct RegionMetadata { pub type RegionMetadataRef = Arc; -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq)] pub struct ColumnMetadata { pub cf_id: ColumnFamilyId, pub desc: ColumnDescriptor, } -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq)] pub struct ColumnsMetadata { /// All columns, in `(key columns, timestamp, [version,] value columns)` order. /// @@ -82,7 +82,7 @@ pub struct ColumnsMetadata { pub name_to_col_index: HashMap, } -#[derive(Default, Clone)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct RowKeyMetadata { /// Exclusive end index of row key columns. row_key_end: usize, @@ -93,7 +93,7 @@ pub struct RowKeyMetadata { pub enable_version_column: bool, } -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq)] pub struct ColumnsRowKeyMetadata { columns: ColumnsMetadata, row_key: RowKeyMetadata, @@ -121,7 +121,7 @@ impl ColumnsRowKeyMetadata { pub type ColumnsRowKeyMetadataRef = Arc; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ColumnFamiliesMetadata { /// Map column family id to column family metadata. id_to_cfs: HashMap, @@ -133,7 +133,7 @@ impl ColumnFamiliesMetadata { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ColumnFamilyMetadata { /// Column family name. pub name: String, diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 97d5b08b60..b9d0ecb2b8 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -9,7 +9,7 @@ pub type ColumnFamilyId = u32; // TODO(yingwen): Validate default value has same type with column, and name is a valid column name. /// A [ColumnDescriptor] contains information to create a column. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct ColumnDescriptor { pub id: ColumnId, pub name: String, @@ -29,7 +29,7 @@ impl From<&ColumnDescriptor> for ColumnSchema { } /// A [RowKeyDescriptor] contains information about row key. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct RowKeyDescriptor { pub columns: Vec, /// Timestamp key column. @@ -41,7 +41,7 @@ pub struct RowKeyDescriptor { } /// A [ColumnFamilyDescriptor] contains information to create a column family. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct ColumnFamilyDescriptor { pub cf_id: ColumnFamilyId, pub name: String, @@ -50,7 +50,7 @@ pub struct ColumnFamilyDescriptor { } /// A [RegionDescriptor] contains information to create a region. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct RegionDescriptor { /// Region name. pub name: String, diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs index e2679b1e6d..361ba7caca 100644 --- a/src/store-api/src/storage/types.rs +++ b/src/store-api/src/storage/types.rs @@ -10,3 +10,9 @@ pub enum ValueType { /// Put operation. Put, } + +impl ValueType { + pub fn as_u8(&self) -> u8 { + *self as u8 + } +}