mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat: Impl BatchIterator for btree memtable
feat: Impl MapIterWrapper refactor: Rename RowKey to InnerKey
This commit is contained in:
@@ -40,7 +40,7 @@ macro_rules! for_all_primitive_types{
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! with_match_primitive_type_id {
|
||||
($key_type:expr, | $_:tt $T:ident | $body:tt, $nbody:tt) => {{
|
||||
($key_type:expr, | $_:tt $T:ident | $body:tt, $nbody:tt) => {{
|
||||
macro_rules! __with_ty__ {
|
||||
( $_ $T:ident ) => {
|
||||
$body
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use common_base::bytes::{Bytes, StringBytes};
|
||||
use ordered_float::OrderedFloat;
|
||||
pub use ordered_float::OrderedFloat;
|
||||
use serde::{Serialize, Serializer};
|
||||
|
||||
use crate::data_type::ConcreteDataType;
|
||||
|
||||
pub type OrderedF32 = OrderedFloat<f32>;
|
||||
pub type OrderedF64 = OrderedFloat<f64>;
|
||||
|
||||
@@ -36,6 +38,38 @@ pub enum Value {
|
||||
DateTime(i64),
|
||||
}
|
||||
|
||||
impl Value {
|
||||
/// Returns data type of the value.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the data type is not supported.
|
||||
pub fn data_type(&self) -> ConcreteDataType {
|
||||
match self {
|
||||
Value::Null => ConcreteDataType::null_datatype(),
|
||||
Value::Boolean(_) => ConcreteDataType::boolean_datatype(),
|
||||
Value::UInt8(_) => ConcreteDataType::uint8_datatype(),
|
||||
Value::UInt16(_) => ConcreteDataType::uint16_datatype(),
|
||||
Value::UInt32(_) => ConcreteDataType::uint32_datatype(),
|
||||
Value::UInt64(_) => ConcreteDataType::uint64_datatype(),
|
||||
Value::Int8(_) => ConcreteDataType::int8_datatype(),
|
||||
Value::Int16(_) => ConcreteDataType::int16_datatype(),
|
||||
Value::Int32(_) => ConcreteDataType::int32_datatype(),
|
||||
Value::Int64(_) => ConcreteDataType::int64_datatype(),
|
||||
Value::Float32(_) => ConcreteDataType::float32_datatype(),
|
||||
Value::Float64(_) => ConcreteDataType::float64_datatype(),
|
||||
Value::String(_) => ConcreteDataType::string_datatype(),
|
||||
Value::Binary(_) => ConcreteDataType::binary_datatype(),
|
||||
Value::Date(_) | Value::DateTime(_) => {
|
||||
unimplemented!("Unsupported data type of value {:?}", self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_null(&self) -> bool {
|
||||
matches!(self, Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_from {
|
||||
($Variant:ident, $Type:ident) => {
|
||||
impl From<$Type> for Value {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod binary;
|
||||
pub mod boolean;
|
||||
mod builder;
|
||||
pub mod constant;
|
||||
mod helper;
|
||||
pub mod mutable;
|
||||
@@ -14,6 +15,7 @@ use arrow::array::ArrayRef;
|
||||
use arrow::bitmap::Bitmap;
|
||||
pub use binary::*;
|
||||
pub use boolean::*;
|
||||
pub use builder::VectorBuilder;
|
||||
pub use constant::*;
|
||||
pub use helper::Helper;
|
||||
pub use mutable::MutableVector;
|
||||
@@ -26,11 +28,6 @@ use crate::data_type::ConcreteDataType;
|
||||
use crate::error::{BadArrayAccessSnafu, Result};
|
||||
use crate::serialize::Serializable;
|
||||
use crate::value::Value;
|
||||
pub use crate::vectors::{
|
||||
BinaryVector, BooleanVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector,
|
||||
Int64Vector, Int8Vector, NullVector, StringVector, UInt16Vector, UInt32Vector, UInt64Vector,
|
||||
UInt8Vector,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Validity<'a> {
|
||||
|
||||
225
src/datatypes/src/vectors/builder.rs
Normal file
225
src/datatypes/src/vectors/builder.rs
Normal file
@@ -0,0 +1,225 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::scalars::ScalarVectorBuilder;
|
||||
use crate::value::Value;
|
||||
use crate::vectors::{
|
||||
BinaryVectorBuilder, BooleanVectorBuilder, Float32VectorBuilder, Float64VectorBuilder,
|
||||
Int16VectorBuilder, Int32VectorBuilder, Int64VectorBuilder, Int8VectorBuilder, NullVector,
|
||||
StringVectorBuilder, UInt16VectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
|
||||
UInt8VectorBuilder, VectorRef,
|
||||
};
|
||||
|
||||
pub enum VectorBuilder {
|
||||
Null(usize),
|
||||
|
||||
// Numeric types:
|
||||
Boolean(BooleanVectorBuilder),
|
||||
UInt8(UInt8VectorBuilder),
|
||||
UInt16(UInt16VectorBuilder),
|
||||
UInt32(UInt32VectorBuilder),
|
||||
UInt64(UInt64VectorBuilder),
|
||||
Int8(Int8VectorBuilder),
|
||||
Int16(Int16VectorBuilder),
|
||||
Int32(Int32VectorBuilder),
|
||||
Int64(Int64VectorBuilder),
|
||||
Float32(Float32VectorBuilder),
|
||||
Float64(Float64VectorBuilder),
|
||||
|
||||
// String types:
|
||||
String(StringVectorBuilder),
|
||||
Binary(BinaryVectorBuilder),
|
||||
}
|
||||
|
||||
impl VectorBuilder {
|
||||
pub fn new(data_type: ConcreteDataType) -> VectorBuilder {
|
||||
VectorBuilder::with_capacity(data_type, 0)
|
||||
}
|
||||
|
||||
pub fn with_capacity(data_type: ConcreteDataType, capacity: usize) -> VectorBuilder {
|
||||
match data_type {
|
||||
ConcreteDataType::Null(_) => VectorBuilder::Null(0),
|
||||
ConcreteDataType::Boolean(_) => {
|
||||
VectorBuilder::Boolean(BooleanVectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::UInt8(_) => {
|
||||
VectorBuilder::UInt8(UInt8VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::UInt16(_) => {
|
||||
VectorBuilder::UInt16(UInt16VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::UInt32(_) => {
|
||||
VectorBuilder::UInt32(UInt32VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::UInt64(_) => {
|
||||
VectorBuilder::UInt64(UInt64VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Int8(_) => {
|
||||
VectorBuilder::Int8(Int8VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Int16(_) => {
|
||||
VectorBuilder::Int16(Int16VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Int32(_) => {
|
||||
VectorBuilder::Int32(Int32VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Int64(_) => {
|
||||
VectorBuilder::Int64(Int64VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Float32(_) => {
|
||||
VectorBuilder::Float32(Float32VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Float64(_) => {
|
||||
VectorBuilder::Float64(Float64VectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::String(_) => {
|
||||
VectorBuilder::String(StringVectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
ConcreteDataType::Binary(_) => {
|
||||
VectorBuilder::Binary(BinaryVectorBuilder::with_capacity(capacity))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&mut self, value: &Value) {
|
||||
if value.is_null() {
|
||||
self.push_null();
|
||||
return;
|
||||
}
|
||||
|
||||
match (self, value) {
|
||||
(VectorBuilder::Boolean(b), Value::Boolean(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::UInt8(b), Value::UInt8(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::UInt16(b), Value::UInt16(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::UInt32(b), Value::UInt32(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::UInt64(b), Value::UInt64(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::Int8(b), Value::Int8(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::Int16(b), Value::Int16(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::Int32(b), Value::Int32(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::Int64(b), Value::Int64(v)) => b.push(Some(*v)),
|
||||
(VectorBuilder::Float32(b), Value::Float32(v)) => b.push(Some(v.into_inner())),
|
||||
(VectorBuilder::Float64(b), Value::Float64(v)) => b.push(Some(v.into_inner())),
|
||||
(VectorBuilder::String(b), Value::String(v)) => b.push(Some(v.as_utf8())),
|
||||
(VectorBuilder::Binary(b), Value::Binary(v)) => b.push(Some(v)),
|
||||
_ => panic!("Value {:?} does not match builder type", value),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_null(&mut self) {
|
||||
match self {
|
||||
VectorBuilder::Null(v) => *v += 1,
|
||||
VectorBuilder::Boolean(b) => b.push(None),
|
||||
VectorBuilder::UInt8(b) => b.push(None),
|
||||
VectorBuilder::UInt16(b) => b.push(None),
|
||||
VectorBuilder::UInt32(b) => b.push(None),
|
||||
VectorBuilder::UInt64(b) => b.push(None),
|
||||
VectorBuilder::Int8(b) => b.push(None),
|
||||
VectorBuilder::Int16(b) => b.push(None),
|
||||
VectorBuilder::Int32(b) => b.push(None),
|
||||
VectorBuilder::Int64(b) => b.push(None),
|
||||
VectorBuilder::Float32(b) => b.push(None),
|
||||
VectorBuilder::Float64(b) => b.push(None),
|
||||
VectorBuilder::String(b) => b.push(None),
|
||||
VectorBuilder::Binary(b) => b.push(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish(&mut self) -> VectorRef {
|
||||
match self {
|
||||
VectorBuilder::Null(v) => Arc::new(NullVector::new(*v)),
|
||||
VectorBuilder::Boolean(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::UInt8(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::UInt16(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::UInt32(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::UInt64(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Int8(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Int16(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Int32(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Int64(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Float32(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Float64(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::String(b) => Arc::new(b.finish()),
|
||||
VectorBuilder::Binary(b) => Arc::new(b.finish()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ordered_float::OrderedFloat;
|
||||
|
||||
use super::*;
|
||||
|
||||
macro_rules! impl_integer_builder_test {
|
||||
($Type: ident, $datatype: ident) => {
|
||||
let mut builder = VectorBuilder::with_capacity(ConcreteDataType::$datatype(), 10);
|
||||
for i in 0..10 {
|
||||
builder.push(&Value::$Type(i));
|
||||
}
|
||||
let vector = builder.finish();
|
||||
|
||||
for i in 0..10 {
|
||||
assert_eq!(Value::$Type(i), vector.get(i as usize));
|
||||
}
|
||||
|
||||
let mut builder = VectorBuilder::new(ConcreteDataType::$datatype());
|
||||
builder.push(&Value::Null);
|
||||
builder.push(&Value::Int32(100));
|
||||
let vector = builder.finish();
|
||||
|
||||
assert!(vector.is_null(0));
|
||||
assert_eq!(Value::Int32(100), vector.get(1));
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_null_vector_builder() {
|
||||
let mut builder = VectorBuilder::new(ConcreteDataType::null_datatype());
|
||||
builder.push(&Value::Null);
|
||||
let vector = builder.finish();
|
||||
assert!(vector.is_null(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_integer_vector_builder() {
|
||||
impl_integer_builder_test!(UInt8, uint8_datatype);
|
||||
impl_integer_builder_test!(UInt16, uint16_datatype);
|
||||
impl_integer_builder_test!(UInt32, uint32_datatype);
|
||||
impl_integer_builder_test!(UInt64, uint64_datatype);
|
||||
impl_integer_builder_test!(Int8, int8_datatype);
|
||||
impl_integer_builder_test!(Int16, int16_datatype);
|
||||
impl_integer_builder_test!(Int32, int32_datatype);
|
||||
impl_integer_builder_test!(Int64, int64_datatype);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_float_vector_builder() {
|
||||
let mut builder = VectorBuilder::new(ConcreteDataType::float32_datatype());
|
||||
builder.push(&Value::Float32(OrderedFloat(1.0)));
|
||||
let vector = builder.finish();
|
||||
assert_eq!(Value::Float32(OrderedFloat(1.0)), vector.get(0));
|
||||
|
||||
let mut builder = VectorBuilder::new(ConcreteDataType::float64_datatype());
|
||||
builder.push(&Value::Float64(OrderedFloat(2.0)));
|
||||
let vector = builder.finish();
|
||||
assert_eq!(Value::Float64(OrderedFloat(2.0)), vector.get(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_binary_vector_builder() {
|
||||
let hello: &[u8] = b"hello";
|
||||
let mut builder = VectorBuilder::new(ConcreteDataType::binary_datatype());
|
||||
builder.push(&Value::Binary(hello.into()));
|
||||
let vector = builder.finish();
|
||||
assert_eq!(Value::Binary(hello.into()), vector.get(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_string_vector_builder() {
|
||||
let hello = "hello";
|
||||
let mut builder = VectorBuilder::new(ConcreteDataType::string_datatype());
|
||||
builder.push(&Value::String(hello.into()));
|
||||
let vector = builder.finish();
|
||||
assert_eq!(Value::String(hello.into()), vector.get(0));
|
||||
}
|
||||
}
|
||||
@@ -15,5 +15,11 @@ pub trait MutableVector: Send + Sync {
|
||||
|
||||
fn as_mut_any(&mut self) -> &mut dyn Any;
|
||||
|
||||
// /// Push a value into the mutable vector.
|
||||
// ///
|
||||
// /// # Panics
|
||||
// /// Panics if the data type of the value differs from the mutable vector's data type.
|
||||
// fn push_value(&mut self, value: &Value);
|
||||
|
||||
fn to_vector(&mut self) -> VectorRef;
|
||||
}
|
||||
|
||||
@@ -209,6 +209,19 @@ impl<T: Primitive + DataTypeBuilder> PrimitiveVectorBuilder<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub type UInt8VectorBuilder = PrimitiveVectorBuilder<u8>;
|
||||
pub type UInt16VectorBuilder = PrimitiveVectorBuilder<u16>;
|
||||
pub type UInt32VectorBuilder = PrimitiveVectorBuilder<u32>;
|
||||
pub type UInt64VectorBuilder = PrimitiveVectorBuilder<u64>;
|
||||
|
||||
pub type Int8VectorBuilder = PrimitiveVectorBuilder<i8>;
|
||||
pub type Int16VectorBuilder = PrimitiveVectorBuilder<i16>;
|
||||
pub type Int32VectorBuilder = PrimitiveVectorBuilder<i32>;
|
||||
pub type Int64VectorBuilder = PrimitiveVectorBuilder<i64>;
|
||||
|
||||
pub type Float32VectorBuilder = PrimitiveVectorBuilder<f32>;
|
||||
pub type Float64VectorBuilder = PrimitiveVectorBuilder<f64>;
|
||||
|
||||
impl<T: Primitive + DataTypeBuilder> MutableVector for PrimitiveVectorBuilder<T> {
|
||||
fn data_type(&self) -> ConcreteDataType {
|
||||
T::build_data_type()
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
mod engine;
|
||||
mod error;
|
||||
mod memtable;
|
||||
pub mod memtable;
|
||||
pub mod metadata;
|
||||
mod region;
|
||||
mod region_writer;
|
||||
|
||||
@@ -20,15 +20,64 @@ pub trait Memtable: Send + Sync {
|
||||
/// Write key/values to the memtable.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panic if the schema of key/value differs from memtable's schema.
|
||||
/// Panics if the schema of key/value differs from memtable's schema.
|
||||
fn write(&self, kvs: &KeyValues) -> Result<()>;
|
||||
|
||||
/// Iterators the memtable.
|
||||
// TODO(yingwen): Consider passing a projector (does column projection).
|
||||
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr>;
|
||||
|
||||
/// Returns the estimated bytes allocated by this memtable from heap.
|
||||
fn bytes_allocated(&self) -> usize;
|
||||
}
|
||||
|
||||
pub type MemtableRef = Arc<dyn Memtable>;
|
||||
|
||||
/// Context for iterating memtable.
|
||||
#[derive(Debug)]
|
||||
pub struct IterContext {
|
||||
/// The suggested batch size of the iterator.
|
||||
pub batch_size: usize,
|
||||
}
|
||||
|
||||
impl Default for IterContext {
|
||||
fn default() -> Self {
|
||||
Self { batch_size: 256 }
|
||||
}
|
||||
}
|
||||
|
||||
/// The ordering of the iterator output.
|
||||
#[derive(Debug)]
|
||||
pub enum RowOrdering {
|
||||
/// The output rows are unordered.
|
||||
Unordered,
|
||||
|
||||
/// The output rows are ordered by key.
|
||||
Key,
|
||||
}
|
||||
|
||||
pub struct Batch {
|
||||
pub keys: Vec<VectorRef>,
|
||||
pub values: Vec<VectorRef>,
|
||||
}
|
||||
|
||||
/// Iterator of memtable.
|
||||
pub trait BatchIterator: Send {
|
||||
/// Returns the schema of this iterator.
|
||||
fn schema(&self) -> &MemtableSchema;
|
||||
|
||||
/// Returns the ordering of the output rows from this iterator.
|
||||
fn ordering(&self) -> RowOrdering;
|
||||
|
||||
/// Fetch next batch from the memtable.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the iterator has already been exhausted.
|
||||
fn next(&mut self) -> Result<Option<Batch>>;
|
||||
}
|
||||
|
||||
pub type BatchIteratorPtr = Box<dyn BatchIterator>;
|
||||
|
||||
pub trait MemtableBuilder: Send + Sync {
|
||||
fn build(&self, schema: MemtableSchema) -> MemtableRef;
|
||||
}
|
||||
@@ -54,9 +103,7 @@ impl KeyValues {
|
||||
self.keys.clear();
|
||||
self.values.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValues {
|
||||
pub fn len(&self) -> usize {
|
||||
self.keys.first().map(|v| v.len()).unwrap_or_default()
|
||||
}
|
||||
|
||||
@@ -1,26 +1,34 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::RwLock;
|
||||
use std::collections::{btree_map, BTreeMap};
|
||||
use std::ops::Bound;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorBuilder;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use store_api::storage::{SequenceNumber, ValueType};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::{KeyValues, Memtable, MemtableSchema};
|
||||
use crate::memtable::{
|
||||
Batch, BatchIterator, BatchIteratorPtr, IterContext, KeyValues, Memtable, MemtableSchema,
|
||||
RowOrdering,
|
||||
};
|
||||
|
||||
type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
|
||||
|
||||
/// A simple memtable implementation based on std's [`BTreeMap`].
|
||||
///
|
||||
/// Mainly for test purpose.
|
||||
/// Mainly for test purpose, don't use in production.
|
||||
pub struct BTreeMemtable {
|
||||
schema: MemtableSchema,
|
||||
map: RwLock<BTreeMap<RowKey, RowValue>>,
|
||||
map: Arc<RwLockMap>,
|
||||
}
|
||||
|
||||
impl BTreeMemtable {
|
||||
pub fn new(schema: MemtableSchema) -> BTreeMemtable {
|
||||
BTreeMemtable {
|
||||
schema,
|
||||
map: RwLock::new(BTreeMap::new()),
|
||||
map: Arc::new(RwLock::new(BTreeMap::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,18 +42,177 @@ impl Memtable for BTreeMemtable {
|
||||
let mut map = self.map.write().unwrap();
|
||||
|
||||
let iter_row = IterRow::new(kvs);
|
||||
for (row_key, row_value) in iter_row {
|
||||
map.insert(row_key, row_value);
|
||||
for (inner_key, row_value) in iter_row {
|
||||
map.insert(inner_key, row_value);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr> {
|
||||
let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone());
|
||||
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
|
||||
fn bytes_allocated(&self) -> usize {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
struct BTreeIterator {
|
||||
ctx: IterContext,
|
||||
schema: MemtableSchema,
|
||||
map: Arc<RwLockMap>,
|
||||
last_key: Option<InnerKey>,
|
||||
}
|
||||
|
||||
impl BatchIterator for BTreeIterator {
|
||||
fn schema(&self) -> &MemtableSchema {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
fn ordering(&self) -> RowOrdering {
|
||||
RowOrdering::Key
|
||||
}
|
||||
|
||||
fn next(&mut self) -> Result<Option<Batch>> {
|
||||
Ok(self.next_batch())
|
||||
}
|
||||
}
|
||||
|
||||
impl BTreeIterator {
|
||||
fn new(ctx: IterContext, schema: MemtableSchema, map: Arc<RwLockMap>) -> BTreeIterator {
|
||||
BTreeIterator {
|
||||
ctx,
|
||||
schema,
|
||||
map,
|
||||
last_key: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn next_batch(&mut self) -> Option<Batch> {
|
||||
let map = self.map.read().unwrap();
|
||||
let iter = if let Some(last_key) = &self.last_key {
|
||||
map.range((Bound::Excluded(last_key), Bound::Unbounded))
|
||||
} else {
|
||||
map.range(..)
|
||||
};
|
||||
let iter = MapIterWrapper::new(iter);
|
||||
|
||||
let mut keys = Vec::with_capacity(self.ctx.batch_size);
|
||||
let mut values = Vec::with_capacity(self.ctx.batch_size);
|
||||
for (inner_key, row_value) in iter.take(self.ctx.batch_size) {
|
||||
keys.push(inner_key);
|
||||
values.push(row_value);
|
||||
}
|
||||
|
||||
if keys.is_empty() {
|
||||
return None;
|
||||
}
|
||||
self.last_key = keys.last().map(|k| (*k).clone());
|
||||
|
||||
Some(Batch {
|
||||
keys: Self::keys_to_vectors(&keys),
|
||||
values: Self::values_to_vectors(&values),
|
||||
})
|
||||
}
|
||||
|
||||
// Assumes column num of all row key is equal.
|
||||
fn keys_to_vectors(keys: &[&InnerKey]) -> Vec<VectorRef> {
|
||||
if keys.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let column_num = keys[0].row_key.len();
|
||||
let row_num = keys.len();
|
||||
let mut builders = Vec::with_capacity(column_num);
|
||||
for v in &keys[0].row_key {
|
||||
builders.push(VectorBuilder::with_capacity(v.data_type(), row_num));
|
||||
}
|
||||
|
||||
let mut vectors = Vec::with_capacity(column_num);
|
||||
for (col_idx, builder) in builders.iter_mut().enumerate() {
|
||||
for row_key in keys {
|
||||
let value = &row_key.row_key[col_idx];
|
||||
builder.push(value);
|
||||
}
|
||||
|
||||
vectors.push(builder.finish());
|
||||
}
|
||||
|
||||
vectors
|
||||
}
|
||||
|
||||
// Assumes column num of all row value is equal.
|
||||
fn values_to_vectors(values: &[&RowValue]) -> Vec<VectorRef> {
|
||||
if values.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let column_num = values[0].values.len();
|
||||
let row_num = values.len();
|
||||
let mut builders = Vec::with_capacity(column_num);
|
||||
for v in &values[0].values {
|
||||
builders.push(VectorBuilder::with_capacity(v.data_type(), row_num));
|
||||
}
|
||||
|
||||
let mut vectors = Vec::with_capacity(column_num);
|
||||
for (col_idx, builder) in builders.iter_mut().enumerate() {
|
||||
for row_value in values {
|
||||
let value = &row_value.values[col_idx];
|
||||
builder.push(value);
|
||||
}
|
||||
|
||||
vectors.push(builder.finish());
|
||||
}
|
||||
|
||||
vectors
|
||||
}
|
||||
}
|
||||
|
||||
/// `MapIterWrapper` removes same user key with elder sequence.
|
||||
struct MapIterWrapper<'a, InnerKey, RowValue> {
|
||||
iter: btree_map::Range<'a, InnerKey, RowValue>,
|
||||
prev_key: Option<InnerKey>,
|
||||
}
|
||||
|
||||
impl<'a> MapIterWrapper<'a, InnerKey, RowValue> {
|
||||
fn new(
|
||||
iter: btree_map::Range<'a, InnerKey, RowValue>,
|
||||
) -> MapIterWrapper<'a, InnerKey, RowValue> {
|
||||
MapIterWrapper {
|
||||
iter,
|
||||
prev_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for MapIterWrapper<'a, InnerKey, RowValue> {
|
||||
type Item = (&'a InnerKey, &'a RowValue);
|
||||
|
||||
fn next(&mut self) -> Option<(&'a InnerKey, &'a RowValue)> {
|
||||
let (mut current_key, mut current_value) = self.iter.next()?;
|
||||
if self.prev_key.is_none() {
|
||||
self.prev_key = Some(current_key.clone());
|
||||
return Some((current_key, current_value));
|
||||
}
|
||||
|
||||
let prev_key = self.prev_key.take().unwrap();
|
||||
while prev_key.is_row_key_equal(current_key) {
|
||||
if let Some((next_key, next_value)) = self.iter.next() {
|
||||
(current_key, current_value) = (next_key, next_value);
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
self.prev_key = Some(current_key.clone());
|
||||
|
||||
Some((current_key, current_value))
|
||||
}
|
||||
}
|
||||
|
||||
struct IterRow<'a> {
|
||||
kvs: &'a KeyValues,
|
||||
index: usize,
|
||||
@@ -61,22 +228,22 @@ impl<'a> IterRow<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn fetch_row(&mut self) -> (RowKey, RowValue) {
|
||||
let keys = self
|
||||
fn fetch_row(&mut self) -> (InnerKey, RowValue) {
|
||||
let row_key = self
|
||||
.kvs
|
||||
.keys
|
||||
.iter()
|
||||
.map(|vector| vector.get(self.index))
|
||||
.collect();
|
||||
let row_key = RowKey {
|
||||
keys,
|
||||
let inner_key = InnerKey {
|
||||
row_key,
|
||||
sequence: self.kvs.sequence,
|
||||
index_in_batch: self.kvs.start_index_in_batch + self.index,
|
||||
value_type: self.kvs.value_type,
|
||||
};
|
||||
|
||||
let row_value = RowValue {
|
||||
_values: self
|
||||
values: self
|
||||
.kvs
|
||||
.values
|
||||
.iter()
|
||||
@@ -84,14 +251,14 @@ impl<'a> IterRow<'a> {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
(row_key, row_value)
|
||||
(inner_key, row_value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for IterRow<'a> {
|
||||
type Item = (RowKey, RowValue);
|
||||
type Item = (InnerKey, RowValue);
|
||||
|
||||
fn next(&mut self) -> Option<(RowKey, RowValue)> {
|
||||
fn next(&mut self) -> Option<(InnerKey, RowValue)> {
|
||||
if self.index >= self.len {
|
||||
return None;
|
||||
}
|
||||
@@ -104,33 +271,39 @@ impl<'a> Iterator for IterRow<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Actually the version and timestamp may order desc.
|
||||
#[derive(PartialEq, Eq)]
|
||||
struct RowKey {
|
||||
keys: Vec<Value>,
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
struct InnerKey {
|
||||
row_key: Vec<Value>,
|
||||
sequence: SequenceNumber,
|
||||
index_in_batch: usize,
|
||||
value_type: ValueType,
|
||||
}
|
||||
|
||||
impl Ord for RowKey {
|
||||
fn cmp(&self, other: &RowKey) -> Ordering {
|
||||
// Order by (keys asc, sequence desc, index_in_batch desc, value type desc), though (key,
|
||||
impl Ord for InnerKey {
|
||||
fn cmp(&self, other: &InnerKey) -> Ordering {
|
||||
// Order by (row_key asc, sequence desc, index_in_batch desc, value type desc), though (key,
|
||||
// sequence, index_in_batch) should be enough to disambiguate.
|
||||
self.keys
|
||||
.cmp(&other.keys)
|
||||
self.row_key
|
||||
.cmp(&other.row_key)
|
||||
.then_with(|| other.sequence.cmp(&self.sequence))
|
||||
.then_with(|| other.index_in_batch.cmp(&self.index_in_batch))
|
||||
.then_with(|| other.value_type.cmp(&self.value_type))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for RowKey {
|
||||
fn partial_cmp(&self, other: &RowKey) -> Option<Ordering> {
|
||||
impl PartialOrd for InnerKey {
|
||||
fn partial_cmp(&self, other: &InnerKey) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
struct RowValue {
|
||||
_values: Vec<Value>,
|
||||
impl InnerKey {
|
||||
fn is_row_key_equal(&self, other: &InnerKey) -> bool {
|
||||
self.row_key == other.row_key
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RowValue {
|
||||
values: Vec<Value>,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MemtableSchema {
|
||||
columns_row_key: ColumnsRowKeyMetadataRef,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user