Merge pull request #42 from GrepTimeTeam/feat/storage/memtable/iter

feat: Add BatchIterator trait and support iterating btree memtable
This commit is contained in:
fengjiachun
2022-06-14 17:21:18 +08:00
committed by GitHub
13 changed files with 948 additions and 54 deletions

View File

@@ -40,7 +40,7 @@ macro_rules! for_all_primitive_types{
#[macro_export]
macro_rules! with_match_primitive_type_id {
($key_type:expr, | $_:tt $T:ident | $body:tt, $nbody:tt) => {{
($key_type:expr, | $_:tt $T:ident | $body:tt, $nbody:tt) => {{
macro_rules! __with_ty__ {
( $_ $T:ident ) => {
$body

View File

@@ -1,7 +1,9 @@
use common_base::bytes::{Bytes, StringBytes};
use ordered_float::OrderedFloat;
pub use ordered_float::OrderedFloat;
use serde::{Serialize, Serializer};
use crate::data_type::ConcreteDataType;
pub type OrderedF32 = OrderedFloat<f32>;
pub type OrderedF64 = OrderedFloat<f64>;
@@ -36,6 +38,38 @@ pub enum Value {
DateTime(i64),
}
impl Value {
/// Returns data type of the value.
///
/// # Panics
/// Panics if the data type is not supported.
pub fn data_type(&self) -> ConcreteDataType {
match self {
Value::Null => ConcreteDataType::null_datatype(),
Value::Boolean(_) => ConcreteDataType::boolean_datatype(),
Value::UInt8(_) => ConcreteDataType::uint8_datatype(),
Value::UInt16(_) => ConcreteDataType::uint16_datatype(),
Value::UInt32(_) => ConcreteDataType::uint32_datatype(),
Value::UInt64(_) => ConcreteDataType::uint64_datatype(),
Value::Int8(_) => ConcreteDataType::int8_datatype(),
Value::Int16(_) => ConcreteDataType::int16_datatype(),
Value::Int32(_) => ConcreteDataType::int32_datatype(),
Value::Int64(_) => ConcreteDataType::int64_datatype(),
Value::Float32(_) => ConcreteDataType::float32_datatype(),
Value::Float64(_) => ConcreteDataType::float64_datatype(),
Value::String(_) => ConcreteDataType::string_datatype(),
Value::Binary(_) => ConcreteDataType::binary_datatype(),
Value::Date(_) | Value::DateTime(_) => {
unimplemented!("Unsupported data type of value {:?}", self)
}
}
}
pub fn is_null(&self) -> bool {
matches!(self, Value::Null)
}
}
macro_rules! impl_from {
($Variant:ident, $Type:ident) => {
impl From<$Type> for Value {
@@ -180,6 +214,66 @@ mod tests {
assert_eq!(Value::Binary(bytes.clone()), Value::from(bytes));
}
#[test]
fn test_value_datatype() {
assert_eq!(
ConcreteDataType::boolean_datatype(),
Value::Boolean(true).data_type()
);
assert_eq!(
ConcreteDataType::uint8_datatype(),
Value::UInt8(u8::MIN).data_type()
);
assert_eq!(
ConcreteDataType::uint16_datatype(),
Value::UInt16(u16::MIN).data_type()
);
assert_eq!(
ConcreteDataType::uint16_datatype(),
Value::UInt16(u16::MAX).data_type()
);
assert_eq!(
ConcreteDataType::uint32_datatype(),
Value::UInt32(u32::MIN).data_type()
);
assert_eq!(
ConcreteDataType::uint64_datatype(),
Value::UInt64(u64::MIN).data_type()
);
assert_eq!(
ConcreteDataType::int8_datatype(),
Value::Int8(i8::MIN).data_type()
);
assert_eq!(
ConcreteDataType::int16_datatype(),
Value::Int16(i16::MIN).data_type()
);
assert_eq!(
ConcreteDataType::int32_datatype(),
Value::Int32(i32::MIN).data_type()
);
assert_eq!(
ConcreteDataType::int64_datatype(),
Value::Int64(i64::MIN).data_type()
);
assert_eq!(
ConcreteDataType::float32_datatype(),
Value::Float32(OrderedFloat(f32::MIN)).data_type(),
);
assert_eq!(
ConcreteDataType::float64_datatype(),
Value::Float64(OrderedFloat(f64::MIN)).data_type(),
);
assert_eq!(
ConcreteDataType::string_datatype(),
Value::String(StringBytes::from("hello")).data_type(),
);
assert_eq!(
ConcreteDataType::binary_datatype(),
Value::Binary(Bytes::from(b"world".as_slice())).data_type()
);
}
#[test]
fn test_value_from_string() {
let hello = "hello".to_string();

View File

@@ -1,5 +1,6 @@
pub mod binary;
pub mod boolean;
mod builder;
pub mod constant;
mod helper;
pub mod mutable;
@@ -14,6 +15,7 @@ use arrow::array::ArrayRef;
use arrow::bitmap::Bitmap;
pub use binary::*;
pub use boolean::*;
pub use builder::VectorBuilder;
pub use constant::*;
pub use helper::Helper;
pub use mutable::MutableVector;
@@ -26,11 +28,6 @@ use crate::data_type::ConcreteDataType;
use crate::error::{BadArrayAccessSnafu, Result};
use crate::serialize::Serializable;
use crate::value::Value;
pub use crate::vectors::{
BinaryVector, BooleanVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector,
Int64Vector, Int8Vector, NullVector, StringVector, UInt16Vector, UInt32Vector, UInt64Vector,
UInt8Vector,
};
#[derive(Debug, PartialEq)]
pub enum Validity<'a> {

View File

@@ -0,0 +1,225 @@
use std::sync::Arc;
use crate::data_type::ConcreteDataType;
use crate::scalars::ScalarVectorBuilder;
use crate::value::Value;
use crate::vectors::{
BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder,
Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, NullVector,
StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
UInt8VectorBuilder, VectorRef,
};
pub enum VectorBuilder {
Null(usize),
// Numeric types:
Boolean(BooleanVectorBuilder),
UInt8(UInt8VectorBuilder),
UInt16(UInt16VectorBuilder),
UInt32(UInt32VectorBuilder),
UInt64(UInt64VectorBuilder),
Int8(Int8VectorBuilder),
Int16(Int16VectorBuilder),
Int32(Int32VectorBuilder),
Int64(Int64VectorBuilder),
Float32(Float32VectorBuilder),
Float64(Float64VectorBuilder),
// String types:
String(StringVectorBuilder),
Binary(BinaryVectorBuilder),
}
impl VectorBuilder {
pub fn new(data_type: ConcreteDataType) -> VectorBuilder {
VectorBuilder::with_capacity(data_type, 0)
}
pub fn with_capacity(data_type: ConcreteDataType, capacity: usize) -> VectorBuilder {
match data_type {
ConcreteDataType::Null(_) => VectorBuilder::Null(0),
ConcreteDataType::Boolean(_) => {
VectorBuilder::Boolean(BooleanVectorBuilder::with_capacity(capacity))
}
ConcreteDataType::UInt8(_) => {
VectorBuilder::UInt8(UInt8VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::UInt16(_) => {
VectorBuilder::UInt16(UInt16VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::UInt32(_) => {
VectorBuilder::UInt32(UInt32VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::UInt64(_) => {
VectorBuilder::UInt64(UInt64VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Int8(_) => {
VectorBuilder::Int8(Int8VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Int16(_) => {
VectorBuilder::Int16(Int16VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Int32(_) => {
VectorBuilder::Int32(Int32VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Int64(_) => {
VectorBuilder::Int64(Int64VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Float32(_) => {
VectorBuilder::Float32(Float32VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Float64(_) => {
VectorBuilder::Float64(Float64VectorBuilder::with_capacity(capacity))
}
ConcreteDataType::String(_) => {
VectorBuilder::String(StringVectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Binary(_) => {
VectorBuilder::Binary(BinaryVectorBuilder::with_capacity(capacity))
}
}
}
pub fn push(&mut self, value: &Value) {
if value.is_null() {
self.push_null();
return;
}
match (self, value) {
(VectorBuilder::Boolean(b), Value::Boolean(v)) => b.push(Some(*v)),
(VectorBuilder::UInt8(b), Value::UInt8(v)) => b.push(Some(*v)),
(VectorBuilder::UInt16(b), Value::UInt16(v)) => b.push(Some(*v)),
(VectorBuilder::UInt32(b), Value::UInt32(v)) => b.push(Some(*v)),
(VectorBuilder::UInt64(b), Value::UInt64(v)) => b.push(Some(*v)),
(VectorBuilder::Int8(b), Value::Int8(v)) => b.push(Some(*v)),
(VectorBuilder::Int16(b), Value::Int16(v)) => b.push(Some(*v)),
(VectorBuilder::Int32(b), Value::Int32(v)) => b.push(Some(*v)),
(VectorBuilder::Int64(b), Value::Int64(v)) => b.push(Some(*v)),
(VectorBuilder::Float32(b), Value::Float32(v)) => b.push(Some(v.into_inner())),
(VectorBuilder::Float64(b), Value::Float64(v)) => b.push(Some(v.into_inner())),
(VectorBuilder::String(b), Value::String(v)) => b.push(Some(v.as_utf8())),
(VectorBuilder::Binary(b), Value::Binary(v)) => b.push(Some(v)),
_ => panic!("Value {:?} does not match builder type", value),
}
}
pub fn push_null(&mut self) {
match self {
VectorBuilder::Null(v) => *v += 1,
VectorBuilder::Boolean(b) => b.push(None),
VectorBuilder::UInt8(b) => b.push(None),
VectorBuilder::UInt16(b) => b.push(None),
VectorBuilder::UInt32(b) => b.push(None),
VectorBuilder::UInt64(b) => b.push(None),
VectorBuilder::Int8(b) => b.push(None),
VectorBuilder::Int16(b) => b.push(None),
VectorBuilder::Int32(b) => b.push(None),
VectorBuilder::Int64(b) => b.push(None),
VectorBuilder::Float32(b) => b.push(None),
VectorBuilder::Float64(b) => b.push(None),
VectorBuilder::String(b) => b.push(None),
VectorBuilder::Binary(b) => b.push(None),
}
}
pub fn finish(&mut self) -> VectorRef {
match self {
VectorBuilder::Null(v) => Arc::new(NullVector::new(*v)),
VectorBuilder::Boolean(b) => Arc::new(b.finish()),
VectorBuilder::UInt8(b) => Arc::new(b.finish()),
VectorBuilder::UInt16(b) => Arc::new(b.finish()),
VectorBuilder::UInt32(b) => Arc::new(b.finish()),
VectorBuilder::UInt64(b) => Arc::new(b.finish()),
VectorBuilder::Int8(b) => Arc::new(b.finish()),
VectorBuilder::Int16(b) => Arc::new(b.finish()),
VectorBuilder::Int32(b) => Arc::new(b.finish()),
VectorBuilder::Int64(b) => Arc::new(b.finish()),
VectorBuilder::Float32(b) => Arc::new(b.finish()),
VectorBuilder::Float64(b) => Arc::new(b.finish()),
VectorBuilder::String(b) => Arc::new(b.finish()),
VectorBuilder::Binary(b) => Arc::new(b.finish()),
}
}
}
#[cfg(test)]
mod tests {
use ordered_float::OrderedFloat;
use super::*;
macro_rules! impl_integer_builder_test {
($Type: ident, $datatype: ident) => {
let mut builder = VectorBuilder::with_capacity(ConcreteDataType::$datatype(), 10);
for i in 0..10 {
builder.push(&Value::$Type(i));
}
let vector = builder.finish();
for i in 0..10 {
assert_eq!(Value::$Type(i), vector.get(i as usize));
}
let mut builder = VectorBuilder::new(ConcreteDataType::$datatype());
builder.push(&Value::Null);
builder.push(&Value::$Type(100));
let vector = builder.finish();
assert!(vector.is_null(0));
assert_eq!(Value::$Type(100), vector.get(1));
};
}
#[test]
fn test_null_vector_builder() {
let mut builder = VectorBuilder::new(ConcreteDataType::null_datatype());
builder.push(&Value::Null);
let vector = builder.finish();
assert!(vector.is_null(0));
}
#[test]
fn test_integer_vector_builder() {
impl_integer_builder_test!(UInt8, uint8_datatype);
impl_integer_builder_test!(UInt16, uint16_datatype);
impl_integer_builder_test!(UInt32, uint32_datatype);
impl_integer_builder_test!(UInt64, uint64_datatype);
impl_integer_builder_test!(Int8, int8_datatype);
impl_integer_builder_test!(Int16, int16_datatype);
impl_integer_builder_test!(Int32, int32_datatype);
impl_integer_builder_test!(Int64, int64_datatype);
}
#[test]
fn test_float_vector_builder() {
let mut builder = VectorBuilder::new(ConcreteDataType::float32_datatype());
builder.push(&Value::Float32(OrderedFloat(1.0)));
let vector = builder.finish();
assert_eq!(Value::Float32(OrderedFloat(1.0)), vector.get(0));
let mut builder = VectorBuilder::new(ConcreteDataType::float64_datatype());
builder.push(&Value::Float64(OrderedFloat(2.0)));
let vector = builder.finish();
assert_eq!(Value::Float64(OrderedFloat(2.0)), vector.get(0));
}
#[test]
fn test_binary_vector_builder() {
let hello: &[u8] = b"hello";
let mut builder = VectorBuilder::new(ConcreteDataType::binary_datatype());
builder.push(&Value::Binary(hello.into()));
let vector = builder.finish();
assert_eq!(Value::Binary(hello.into()), vector.get(0));
}
#[test]
fn test_string_vector_builder() {
let hello = "hello";
let mut builder = VectorBuilder::new(ConcreteDataType::string_datatype());
builder.push(&Value::String(hello.into()));
let vector = builder.finish();
assert_eq!(Value::String(hello.into()), vector.get(0));
}
}

View File

@@ -209,6 +209,19 @@ impl<T: Primitive + DataTypeBuilder> PrimitiveVectorBuilder<T> {
}
}
pub type UInt8VectorBuilder = PrimitiveVectorBuilder<u8>;
pub type UInt16VectorBuilder = PrimitiveVectorBuilder<u16>;
pub type UInt32VectorBuilder = PrimitiveVectorBuilder<u32>;
pub type UInt64VectorBuilder = PrimitiveVectorBuilder<u64>;
pub type Int8VectorBuilder = PrimitiveVectorBuilder<i8>;
pub type Int16VectorBuilder = PrimitiveVectorBuilder<i16>;
pub type Int32VectorBuilder = PrimitiveVectorBuilder<i32>;
pub type Int64VectorBuilder = PrimitiveVectorBuilder<i64>;
pub type Float32VectorBuilder = PrimitiveVectorBuilder<f32>;
pub type Float64VectorBuilder = PrimitiveVectorBuilder<f64>;
impl<T: Primitive + DataTypeBuilder> MutableVector for PrimitiveVectorBuilder<T> {
fn data_type(&self) -> ConcreteDataType {
T::build_data_type()

View File

@@ -2,7 +2,7 @@
mod engine;
mod error;
mod memtable;
pub mod memtable;
pub mod metadata;
mod region;
mod region_writer;

View File

@@ -1,11 +1,14 @@
mod btree;
mod inserter;
mod schema;
#[cfg(test)]
mod tests;
use std::mem;
use std::sync::Arc;
use datatypes::vectors::VectorRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef};
use snafu::Snafu;
use store_api::storage::{SequenceNumber, ValueType};
use crate::error::Result;
@@ -20,15 +23,66 @@ pub trait Memtable: Send + Sync {
/// Write key/values to the memtable.
///
/// # Panics
/// Panic if the schema of key/value differs from memtable's schema.
/// Panics if the schema of key/value differs from memtable's schema.
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Iterates the memtable.
// TODO(yingwen): Consider passing a projector (does column projection).
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr>;
/// Returns the estimated bytes allocated by this memtable from heap.
fn bytes_allocated(&self) -> usize;
}
pub type MemtableRef = Arc<dyn Memtable>;
/// Context for iterating memtable.
#[derive(Debug, Clone)]
pub struct IterContext {
/// The suggested batch size of the iterator.
pub batch_size: usize,
}
impl Default for IterContext {
fn default() -> Self {
Self { batch_size: 256 }
}
}
/// The ordering of the iterator output.
#[derive(Debug, PartialEq)]
pub enum RowOrdering {
/// The output rows are unordered.
Unordered,
/// The output rows are ordered by key.
Key,
}
pub struct Batch {
pub keys: Vec<VectorRef>,
pub sequences: UInt64Vector,
pub value_types: UInt8Vector,
pub values: Vec<VectorRef>,
}
/// Iterator of memtable.
pub trait BatchIterator: Send {
/// Returns the schema of this iterator.
fn schema(&self) -> &MemtableSchema;
/// Returns the ordering of the output rows from this iterator.
fn ordering(&self) -> RowOrdering;
/// Fetch next batch from the memtable.
///
/// # Panics
/// Panics if the iterator has already been exhausted.
fn next(&mut self) -> Result<Option<Batch>>;
}
pub type BatchIteratorPtr = Box<dyn BatchIterator>;
pub trait MemtableBuilder: Send + Sync {
fn build(&self, schema: MemtableSchema) -> MemtableRef;
}
@@ -54,12 +108,14 @@ impl KeyValues {
self.keys.clear();
self.values.clear();
}
}
impl KeyValues {
pub fn len(&self) -> usize {
self.keys.first().map(|v| v.len()).unwrap_or_default()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct DefaultMemtableBuilder {}
@@ -70,6 +126,10 @@ impl MemtableBuilder for DefaultMemtableBuilder {
}
}
#[derive(Debug, Snafu)]
#[snafu(display("Fail to switch memtable"))]
pub struct SwitchError;
pub struct MemtableSet {
mem: MemtableRef,
// TODO(yingwen): Support multiple immutable memtables.
@@ -86,9 +146,12 @@ impl MemtableSet {
}
/// Switch mutable memtable to immutable memtable, returns the old mutable memtable if success.
pub fn _switch_memtable(&mut self, mem: &MemtableRef) -> std::result::Result<MemtableRef, ()> {
pub fn _switch_memtable(
&mut self,
mem: &MemtableRef,
) -> std::result::Result<MemtableRef, SwitchError> {
match &self._immem {
Some(_) => Err(()),
Some(_) => SwitchSnafu {}.fail(),
None => {
let old_mem = mem::replace(&mut self.mem, mem.clone());
self._immem = Some(old_mem.clone());

View File

@@ -1,26 +1,34 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::RwLock;
use std::collections::{btree_map, BTreeMap};
use std::ops::Bound;
use std::sync::{Arc, RwLock};
use datatypes::prelude::*;
use datatypes::value::Value;
use datatypes::vectors::{UInt64VectorBuilder, UInt8VectorBuilder, VectorBuilder};
use store_api::storage::{SequenceNumber, ValueType};
use crate::error::Result;
use crate::memtable::{KeyValues, Memtable, MemtableSchema};
use crate::memtable::{
Batch, BatchIterator, BatchIteratorPtr, IterContext, KeyValues, Memtable, MemtableSchema,
RowOrdering,
};
type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
/// A simple memtable implementation based on std's [`BTreeMap`].
///
/// Mainly for test purpose.
/// Mainly for test purpose, don't use in production.
pub struct BTreeMemtable {
schema: MemtableSchema,
map: RwLock<BTreeMap<RowKey, RowValue>>,
map: Arc<RwLockMap>,
}
impl BTreeMemtable {
pub fn new(schema: MemtableSchema) -> BTreeMemtable {
BTreeMemtable {
schema,
map: RwLock::new(BTreeMap::new()),
map: Arc::new(RwLock::new(BTreeMap::new())),
}
}
}
@@ -34,18 +42,133 @@ impl Memtable for BTreeMemtable {
let mut map = self.map.write().unwrap();
let iter_row = IterRow::new(kvs);
for (row_key, row_value) in iter_row {
map.insert(row_key, row_value);
for (inner_key, row_value) in iter_row {
map.insert(inner_key, row_value);
}
Ok(())
}
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr> {
assert!(ctx.batch_size > 0);
let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone());
Ok(Box::new(iter))
}
fn bytes_allocated(&self) -> usize {
unimplemented!()
}
}
struct BTreeIterator {
ctx: IterContext,
schema: MemtableSchema,
map: Arc<RwLockMap>,
last_key: Option<InnerKey>,
}
impl BatchIterator for BTreeIterator {
fn schema(&self) -> &MemtableSchema {
&self.schema
}
fn ordering(&self) -> RowOrdering {
RowOrdering::Key
}
fn next(&mut self) -> Result<Option<Batch>> {
Ok(self.next_batch())
}
}
impl BTreeIterator {
fn new(ctx: IterContext, schema: MemtableSchema, map: Arc<RwLockMap>) -> BTreeIterator {
BTreeIterator {
ctx,
schema,
map,
last_key: None,
}
}
fn next_batch(&mut self) -> Option<Batch> {
let map = self.map.read().unwrap();
let iter = if let Some(last_key) = &self.last_key {
map.range((Bound::Excluded(last_key), Bound::Unbounded))
} else {
map.range(..)
};
let iter = MapIterWrapper::new(iter);
let mut keys = Vec::with_capacity(self.ctx.batch_size);
let mut sequences = UInt64VectorBuilder::with_capacity(self.ctx.batch_size);
let mut value_types = UInt8VectorBuilder::with_capacity(self.ctx.batch_size);
let mut values = Vec::with_capacity(self.ctx.batch_size);
for (inner_key, row_value) in iter.take(self.ctx.batch_size) {
keys.push(inner_key);
sequences.push(Some(inner_key.sequence));
value_types.push(Some(inner_key.value_type.as_u8()));
values.push(row_value);
}
if keys.is_empty() {
return None;
}
self.last_key = keys.last().map(|k| (*k).clone());
Some(Batch {
keys: rows_to_vectors(keys.as_slice()),
sequences: sequences.finish(),
value_types: value_types.finish(),
values: rows_to_vectors(values.as_slice()),
})
}
}
/// `MapIterWrapper` removes same user key with elder sequence.
struct MapIterWrapper<'a, InnerKey, RowValue> {
iter: btree_map::Range<'a, InnerKey, RowValue>,
prev_key: Option<InnerKey>,
}
impl<'a> MapIterWrapper<'a, InnerKey, RowValue> {
fn new(
iter: btree_map::Range<'a, InnerKey, RowValue>,
) -> MapIterWrapper<'a, InnerKey, RowValue> {
MapIterWrapper {
iter,
prev_key: None,
}
}
}
impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> {
type Item = (&'a InnerKey, &'a RowValue);
fn next(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> {
let (mut current_key, mut current_value) = self.iter.next()?;
if self.prev_key.is_none() {
self.prev_key = Some(current_key.clone());
return Some((current_key, current_value));
}
let prev_key = self.prev_key.take().unwrap();
while prev_key.is_row_key_equal(current_key) {
if let Some((next_key, next_value)) = self.iter.next() {
(current_key, current_value) = (next_key, next_value);
} else {
return None;
}
}
self.prev_key = Some(current_key.clone());
Some((current_key, current_value))
}
}
struct IterRow<'a> {
kvs: &'a KeyValues,
index: usize,
@@ -61,22 +184,22 @@ impl<'a> IterRow<'a> {
}
}
fn fetch_row(&mut self) -> (RowKey, RowValue) {
let keys = self
fn fetch_row(&mut self) -> (InnerKey, RowValue) {
let row_key = self
.kvs
.keys
.iter()
.map(|vector| vector.get(self.index))
.collect();
let row_key = RowKey {
keys,
let inner_key = InnerKey {
row_key,
sequence: self.kvs.sequence,
index_in_batch: self.kvs.start_index_in_batch + self.index,
value_type: self.kvs.value_type,
};
let row_value = RowValue {
_values: self
values: self
.kvs
.values
.iter()
@@ -84,14 +207,16 @@ impl<'a> IterRow<'a> {
.collect(),
};
(row_key, row_value)
self.index += 1;
(inner_key, row_value)
}
}
impl<'a> Iterator for IterRow<'a> {
type Item = (RowKey, RowValue);
type Item = (InnerKey, RowValue);
fn next(&mut self) -> Option<(RowKey, RowValue)> {
fn next(&mut self) -> Option<(InnerKey, RowValue)> {
if self.index >= self.len {
return None;
}
@@ -104,33 +229,99 @@ impl<'a> Iterator for IterRow<'a> {
}
}
// TODO(yingwen): Actually the version and timestamp may order desc.
#[derive(PartialEq, Eq)]
struct RowKey {
keys: Vec<Value>,
#[derive(Clone, Debug, PartialEq, Eq)]
struct InnerKey {
row_key: Vec<Value>,
sequence: SequenceNumber,
index_in_batch: usize,
value_type: ValueType,
}
impl Ord for RowKey {
fn cmp(&self, other: &RowKey) -> Ordering {
// Order by (keys asc, sequence desc, index_in_batch desc, value type desc), though (key,
impl Ord for InnerKey {
fn cmp(&self, other: &InnerKey) -> Ordering {
// Order by (row_key asc, sequence desc, index_in_batch desc, value type desc), though (key,
// sequence, index_in_batch) should be enough to disambiguate.
self.keys
.cmp(&other.keys)
self.row_key
.cmp(&other.row_key)
.then_with(|| other.sequence.cmp(&self.sequence))
.then_with(|| other.index_in_batch.cmp(&self.index_in_batch))
.then_with(|| other.value_type.cmp(&self.value_type))
}
}
impl PartialOrd for RowKey {
fn partial_cmp(&self, other: &RowKey) -> Option<Ordering> {
impl PartialOrd for InnerKey {
fn partial_cmp(&self, other: &InnerKey) -> Option<Ordering> {
Some(self.cmp(other))
}
}
struct RowValue {
_values: Vec<Value>,
impl InnerKey {
fn is_row_key_equal(&self, other: &InnerKey) -> bool {
self.row_key == other.row_key
}
}
#[derive(Clone, Debug)]
struct RowValue {
values: Vec<Value>,
}
trait RowsProvider {
fn row_num(&self) -> usize;
fn column_num(&self) -> usize {
self.row_by_index(0).len()
}
fn is_empty(&self) -> bool {
self.row_num() == 0
}
fn row_by_index(&self, idx: usize) -> &Vec<Value>;
}
impl<'a> RowsProvider for &'a [&InnerKey] {
fn row_num(&self) -> usize {
self.len()
}
fn row_by_index(&self, idx: usize) -> &Vec<Value> {
&self[idx].row_key
}
}
impl<'a> RowsProvider for &'a [&RowValue] {
fn row_num(&self) -> usize {
self.len()
}
fn row_by_index(&self, idx: usize) -> &Vec<Value> {
&self[idx].values
}
}
fn rows_to_vectors<T: RowsProvider>(provider: T) -> Vec<VectorRef> {
if provider.is_empty() {
return Vec::new();
}
let column_num = provider.column_num();
let row_num = provider.row_num();
let mut builders = Vec::with_capacity(column_num);
for v in provider.row_by_index(0) {
builders.push(VectorBuilder::with_capacity(v.data_type(), row_num));
}
let mut vectors = Vec::with_capacity(column_num);
for (col_idx, builder) in builders.iter_mut().enumerate() {
for row_idx in 0..row_num {
let row = provider.row_by_index(row_idx);
let value = &row[col_idx];
builder.push(value);
}
vectors.push(builder.finish());
}
vectors
}

View File

@@ -1,5 +1,6 @@
use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef};
#[derive(Clone, Debug, PartialEq)]
pub struct MemtableSchema {
columns_row_key: ColumnsRowKeyMetadataRef,
}

View File

@@ -0,0 +1,304 @@
use datatypes::prelude::*;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder};
use super::*;
use crate::metadata::RegionMetadata;
use crate::test_util::descriptor_util::RegionDescBuilder;
// Schema for testing memtable:
// - key: Int64(timestamp), UInt64(version),
// - value: UInt64
fn schema_for_test() -> MemtableSchema {
// Just build a region desc and use its columns_row_key metadata.
let desc = RegionDescBuilder::new("test")
.push_value_column(("v1", LogicalTypeId::UInt64, true))
.build();
let metadata: RegionMetadata = desc.try_into().unwrap();
MemtableSchema::new(metadata.columns_row_key)
}
fn kvs_for_test_with_index(
sequence: SequenceNumber,
value_type: ValueType,
start_index_in_batch: usize,
keys: &[(i64, u64)],
values: &[Option<u64>],
) -> KeyValues {
assert_eq!(keys.len(), values.len());
let mut key_builders = (
Int64VectorBuilder::with_capacity(keys.len()),
UInt64VectorBuilder::with_capacity(keys.len()),
);
for key in keys {
key_builders.0.push(Some(key.0));
key_builders.1.push(Some(key.1));
}
let row_keys = vec![
Arc::new(key_builders.0.finish()) as _,
Arc::new(key_builders.1.finish()) as _,
];
let mut value_builder = UInt64VectorBuilder::with_capacity(values.len());
for value in values {
value_builder.push(*value);
}
let row_values = vec![Arc::new(value_builder.finish()) as _];
let kvs = KeyValues {
sequence,
value_type,
start_index_in_batch,
keys: row_keys,
values: row_values,
};
assert_eq!(keys.len(), kvs.len());
assert_eq!(keys.is_empty(), kvs.is_empty());
kvs
}
fn kvs_for_test(
sequence: SequenceNumber,
value_type: ValueType,
keys: &[(i64, u64)],
values: &[Option<u64>],
) -> KeyValues {
kvs_for_test_with_index(sequence, value_type, 0, keys, values)
}
fn write_kvs(
memtable: &dyn Memtable,
sequence: SequenceNumber,
value_type: ValueType,
keys: &[(i64, u64)],
values: &[Option<u64>],
) {
let kvs = kvs_for_test(sequence, value_type, keys, values);
memtable.write(&kvs).unwrap();
}
fn check_batch_valid(batch: &Batch) {
assert_eq!(2, batch.keys.len());
assert_eq!(1, batch.values.len());
let row_num = batch.keys[0].len();
assert_eq!(row_num, batch.keys[1].len());
assert_eq!(row_num, batch.sequences.len());
assert_eq!(row_num, batch.value_types.len());
assert_eq!(row_num, batch.values[0].len());
}
fn check_iter_content(
iter: &mut dyn BatchIterator,
keys: &[(i64, u64)],
sequences: &[u64],
value_types: &[ValueType],
values: &[Option<u64>],
) {
let mut index = 0;
while let Some(batch) = iter.next().unwrap() {
check_batch_valid(&batch);
let row_num = batch.keys[0].len();
for i in 0..row_num {
let (k0, k1) = (batch.keys[0].get(i), batch.keys[1].get(i));
let sequence = batch.sequences.get_data(i).unwrap();
let value_type = batch.value_types.get_data(i).unwrap();
let v = batch.values[0].get(i);
assert_eq!(Value::from(keys[index].0), k0);
assert_eq!(Value::from(keys[index].1), k1);
assert_eq!(sequences[index], sequence);
assert_eq!(value_types[index].as_u8(), value_type);
assert_eq!(Value::from(values[index]), v);
index += 1;
}
}
assert_eq!(keys.len(), index);
}
// TODO(yingwen): Check size of the returned batch.
struct MemtableTester {
schema: MemtableSchema,
builders: Vec<MemtableBuilderRef>,
}
impl Default for MemtableTester {
fn default() -> MemtableTester {
MemtableTester::new()
}
}
impl MemtableTester {
fn new() -> MemtableTester {
let schema = schema_for_test();
let builders = vec![Arc::new(DefaultMemtableBuilder {}) as _];
MemtableTester { schema, builders }
}
fn new_memtables(&self) -> Vec<MemtableRef> {
self.builders
.iter()
.map(|b| b.build(self.schema.clone()))
.collect()
}
fn run_testcase<F>(&self, testcase: F)
where
F: Fn(TestContext),
{
for memtable in self.new_memtables() {
let test_ctx = TestContext {
schema: self.schema.clone(),
memtable,
};
testcase(test_ctx);
}
}
}
struct TestContext {
schema: MemtableSchema,
memtable: MemtableRef,
}
fn write_iter_memtable_case(ctx: &TestContext) {
// Test iterating an empty memtable.
let mut iter = ctx.memtable.iter(IterContext::default()).unwrap();
assert!(iter.next().unwrap().is_none());
// Init test data.
write_kvs(
&*ctx.memtable,
10, // sequence
ValueType::Put,
&[
(1000, 1),
(1000, 2),
(2002, 1),
(2003, 1),
(2003, 5),
(1001, 1),
], // keys
&[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values
);
write_kvs(
&*ctx.memtable,
11, // sequence
ValueType::Put,
&[(1002, 1), (1003, 1), (1004, 1)], // keys
&[None, Some(5), None], // values
);
let batch_sizes = [1, 4, 8, 256];
for batch_size in batch_sizes {
let iter_ctx = IterContext { batch_size };
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
assert_eq!(ctx.schema, *iter.schema());
assert_eq!(RowOrdering::Key, iter.ordering());
check_iter_content(
&mut *iter,
&[
(1000, 1),
(1000, 2),
(1001, 1),
(1002, 1),
(1003, 1),
(1004, 1),
(2002, 1),
(2003, 1),
(2003, 5),
], // keys
&[10, 10, 10, 11, 11, 11, 10, 10, 10], // sequences
&[
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
ValueType::Put,
], // value types
&[
Some(1),
Some(2),
Some(3),
None,
Some(5),
None,
Some(7),
Some(8),
Some(9),
], // values
);
}
}
#[test]
fn test_write_iter_memtable() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_iter_memtable_case(&ctx);
});
}
fn check_iter_batch_size(iter: &mut dyn BatchIterator, total: usize, batch_size: usize) {
let mut remains = total;
while let Some(batch) = iter.next().unwrap() {
check_batch_valid(&batch);
let row_num = batch.keys[0].len();
if remains >= batch_size {
assert_eq!(batch_size, row_num);
remains -= batch_size;
} else {
assert_eq!(remains, row_num);
remains = 0;
}
}
assert_eq!(0, remains);
}
#[test]
fn test_iter_batch_size() {
let tester = MemtableTester::default();
tester.run_testcase(|ctx| {
write_kvs(
&*ctx.memtable,
10, // sequence
ValueType::Put,
&[
(1000, 1),
(1000, 2),
(1001, 1),
(2002, 1),
(2003, 1),
(2003, 5),
], // keys
&[Some(1), Some(2), Some(3), Some(4), None, None], // values
);
let total = 6;
// Batch size [less than, equal to, greater than] total
let batch_sizes = [1, 6, 8];
for batch_size in batch_sizes {
let iter_ctx = IterContext { batch_size };
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
check_iter_batch_size(&mut *iter, total, batch_size);
}
});
}

View File

@@ -48,7 +48,7 @@ pub type VersionNumber = u32;
// TODO(yingwen): Make some fields of metadata private.
/// In memory metadata of region.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct RegionMetadata {
/// Schema of the region.
///
@@ -66,13 +66,13 @@ pub struct RegionMetadata {
pub type RegionMetadataRef = Arc<RegionMetadata>;
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq)]
pub struct ColumnMetadata {
pub cf_id: ColumnFamilyId,
pub desc: ColumnDescriptor,
}
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq)]
pub struct ColumnsMetadata {
/// All columns, in `(key columns, timestamp, [version,] value columns)` order.
///
@@ -82,7 +82,7 @@ pub struct ColumnsMetadata {
pub name_to_col_index: HashMap<String, usize>,
}
#[derive(Default, Clone)]
#[derive(Clone, Debug, Default, PartialEq)]
pub struct RowKeyMetadata {
/// Exclusive end index of row key columns.
row_key_end: usize,
@@ -93,7 +93,7 @@ pub struct RowKeyMetadata {
pub enable_version_column: bool,
}
#[derive(Clone)]
#[derive(Clone, Debug, PartialEq)]
pub struct ColumnsRowKeyMetadata {
columns: ColumnsMetadata,
row_key: RowKeyMetadata,
@@ -121,7 +121,7 @@ impl ColumnsRowKeyMetadata {
pub type ColumnsRowKeyMetadataRef = Arc<ColumnsRowKeyMetadata>;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ColumnFamiliesMetadata {
/// Map column family id to column family metadata.
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
@@ -133,7 +133,7 @@ impl ColumnFamiliesMetadata {
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ColumnFamilyMetadata {
/// Column family name.
pub name: String,

View File

@@ -9,7 +9,7 @@ pub type ColumnFamilyId = u32;
// TODO(yingwen): Validate default value has same type with column, and name is a valid column name.
/// A [ColumnDescriptor] contains information to create a column.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnDescriptor {
pub id: ColumnId,
pub name: String,
@@ -29,7 +29,7 @@ impl From<&ColumnDescriptor> for ColumnSchema {
}
/// A [RowKeyDescriptor] contains information about row key.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RowKeyDescriptor {
pub columns: Vec<ColumnDescriptor>,
/// Timestamp key column.
@@ -41,7 +41,7 @@ pub struct RowKeyDescriptor {
}
/// A [ColumnFamilyDescriptor] contains information to create a column family.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnFamilyDescriptor {
pub cf_id: ColumnFamilyId,
pub name: String,
@@ -50,7 +50,7 @@ pub struct ColumnFamilyDescriptor {
}
/// A [RegionDescriptor] contains information to create a region.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct RegionDescriptor {
/// Region name.
pub name: String,

View File

@@ -10,3 +10,9 @@ pub enum ValueType {
/// Put operation.
Put,
}
impl ValueType {
pub fn as_u8(&self) -> u8 {
*self as u8
}
}