feat: Support creating in memory region and writing to memtable (#40)

* chore(store-api): Fix typo in region comments

* feat(storage): Init storage crate

* feat(store-api): Make some method async

* feat(storage): Blank StorageEngine implementation

* feat(storage): StorageEngine returns owned SchemaRef

* feat: pub use arrow in datatypes

* feat(store-api): Implement RegionMetadata

* feat(storage): Impl create region in memory.

* chore(object-store): Format cargo toml

* chore(storage): Log on region created

* feat: Impl CowCell

* feat: Store id to cf meta mapping

* refactor: Refactor version and rename it to VersionControl

* feat: Impl write batch for put, refactor column family

* feat(storage): Skeleton of writing to memtable

* refactor(storage): MemTable returns MemTableSchema

* feat: Add ColumnSchema and conversion between schema and arrow's schema

* feat: Validate put data

* feat: Valid schema of write batch

* feat: insert memtable WIP

* feat: Impl Inserter for memtable

* feat(datatypes): Implement Eq/Ord for Value

feat: Implement Ord/Eq for Bytes/StringBytes and Deref for Bytes

test: Test Value::from()

* feat: Define BTreeMemTable

* Fix: Rename get/get_unchecked to try_get/get and fix get not consider null.

* feat: Impl BTreeMemTable::write()

* refactor: Remove useless ColumnFamilyHandle now

* chore: Clean comment

* feat(common): Add from `String/&str/Vec<u8>/&[u8]` for Value

* test(storage): Add tests for WriteBatch

* chore: Fix clippy

* feat: Add builder for RowKey/ColumnFamilyDescriptor

* test: Add test for metadata

* chore: Fix clippy

* test: Add test for region and engine

* chore: Fix clippy

* chore: Address CR comment
This commit is contained in:
evenyag
2022-06-09 16:50:02 +08:00
committed by GitHub
parent 8fe577649f
commit 4171173b76
64 changed files with 2911 additions and 203 deletions

31
Cargo.lock generated
View File

@@ -67,6 +67,12 @@ version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "arc-swap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
[[package]]
name = "array-init-cursor"
version = "0.2.0"
@@ -918,6 +924,7 @@ dependencies = [
"common-error",
"datafusion-common",
"enum_dispatch",
"ordered-float 3.0.0",
"paste",
"serde",
"serde_json",
@@ -2116,6 +2123,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ordered-float"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96bcbab4bfea7a59c2c0fe47211a1ac4e3e96bea6eb446d704f310bc5c732ae2"
dependencies = [
"num-traits",
]
[[package]]
name = "ordered-multimap"
version = "0.4.3"
@@ -2959,6 +2975,20 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "storage"
version = "0.1.0"
dependencies = [
"arc-swap",
"async-trait",
"common-error",
"common-telemetry",
"datatypes",
"snafu",
"store-api",
"tokio",
]
[[package]]
name = "store-api"
version = "0.1.0"
@@ -3044,7 +3074,6 @@ checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
name = "table"
version = "0.1.0"
dependencies = [
"arrow2",
"async-trait",
"chrono",
"common-error",

View File

@@ -14,6 +14,7 @@ members = [
"src/object-store",
"src/query",
"src/sql",
"src/storage",
"src/store-api",
"src/table",
]

View File

@@ -12,6 +12,8 @@ pub enum StatusCode {
Unexpected,
/// Internal server error.
Internal,
/// Invalid arguments.
InvalidArguments,
// ====== End of common status code ================
// ====== Begin of SQL related status code =========

View File

@@ -99,7 +99,7 @@ mod tests {
for i in 0..3 {
let p: f64 = (values[i] as f64).pow(bases[i] as f64);
assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == p));
assert!(matches!(vector.get(i), Value::Float64(v) if v == p));
}
}
}

View File

@@ -198,11 +198,11 @@ mod tests {
// clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6]
for i in 0..10 {
if i <= 3 {
assert!(matches!(vector.get_unchecked(i), Value::Int64(v) if v == 3));
assert!(matches!(vector.get(i), Value::Int64(v) if v == 3));
} else if i <= 6 {
assert!(matches!(vector.get_unchecked(i), Value::Int64(v) if v == (i as i64)));
assert!(matches!(vector.get(i), Value::Int64(v) if v == (i as i64)));
} else {
assert!(matches!(vector.get_unchecked(i), Value::Int64(v) if v == 6));
assert!(matches!(vector.get(i), Value::Int64(v) if v == 6));
}
}
@@ -225,11 +225,11 @@ mod tests {
// clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6]
for i in 0..10 {
if i <= 3 {
assert!(matches!(vector.get_unchecked(i), Value::UInt64(v) if v == 3));
assert!(matches!(vector.get(i), Value::UInt64(v) if v == 3));
} else if i <= 6 {
assert!(matches!(vector.get_unchecked(i), Value::UInt64(v) if v == (i as u64)));
assert!(matches!(vector.get(i), Value::UInt64(v) if v == (i as u64)));
} else {
assert!(matches!(vector.get_unchecked(i), Value::UInt64(v) if v == 6));
assert!(matches!(vector.get(i), Value::UInt64(v) if v == 6));
}
}
@@ -252,11 +252,11 @@ mod tests {
// clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6]
for i in 0..10 {
if i <= 3 {
assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == 3.0));
assert!(matches!(vector.get(i), Value::Float64(v) if v == 3.0));
} else if i <= 6 {
assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == (i as f64)));
assert!(matches!(vector.get(i), Value::Float64(v) if v == (i as f64)));
} else {
assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == 6.0));
assert!(matches!(vector.get(i), Value::Float64(v) if v == 6.0));
}
}
}

View File

@@ -83,9 +83,7 @@ mod tests {
assert_eq!(3, vector.len());
for i in 0..3 {
assert!(
matches!(vector.get_unchecked(i), Value::Boolean(b) if b == (i == 0 || i == 2))
);
assert!(matches!(vector.get(i), Value::Boolean(b) if b == (i == 0 || i == 2)));
}
// create a udf and test it again

View File

@@ -3,15 +3,10 @@ name = "common-recordbatch"
version = "0.1.0"
edition = "2021"
[dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
datatypes = {path ="../../datatypes" }
datatypes = { path = "../../datatypes" }
futures = "0.3"
paste = "1.0"
serde = "1.0"

View File

@@ -1,4 +1,4 @@
use arrow::error::ArrowError;
use datatypes::arrow::error::ArrowError;
use snafu::{Backtrace, Snafu};
#[derive(Debug, Snafu)]

View File

@@ -49,7 +49,7 @@ mod tests {
DataType::UInt32,
false,
)]));
let schema = Arc::new(Schema::new(arrow_schema.clone()));
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
let numbers: Vec<u32> = (0..10).collect();
let df_batch = DfRecordBatch::try_new(

View File

@@ -56,7 +56,7 @@ mod tests {
DataType::UInt32,
false,
)]));
let schema = Arc::new(Schema::new(arrow_schema.clone()));
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
let stream = MockRecordBatchStream {
schema: schema.clone(),
@@ -79,7 +79,7 @@ mod tests {
};
let stream = MockRecordBatchStream {
schema: Arc::new(Schema::new(arrow_schema)),
schema: Arc::new(Schema::try_from(arrow_schema).unwrap()),
batch: Some(batch.clone()),
};
let batches = collect(Box::pin(stream)).await.unwrap();

View File

@@ -13,6 +13,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2" }
enum_dispatch = "0.3"
ordered-float = "3.0"
paste = "1.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0"

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use arrow::datatypes::DataType as ArrowDataType;
use paste::paste;
use crate::error::{self, Error, Result};
use crate::type_id::LogicalTypeId;
use crate::types::{
BinaryType, BooleanType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
@@ -81,7 +82,15 @@ impl ConcreteDataType {
/// # Panics
/// Panic if given arrow data type is not supported.
pub fn from_arrow_type(dt: &ArrowDataType) -> Self {
match dt {
ConcreteDataType::try_from(dt).expect("Unimplemented type")
}
}
impl TryFrom<&ArrowDataType> for ConcreteDataType {
type Error = Error;
fn try_from(dt: &ArrowDataType) -> Result<ConcreteDataType> {
let concrete_type = match dt {
ArrowDataType::Null => Self::null_datatype(),
ArrowDataType::Boolean => Self::boolean_datatype(),
ArrowDataType::UInt8 => Self::uint8_datatype(),
@@ -97,9 +106,14 @@ impl ConcreteDataType {
ArrowDataType::Binary | ArrowDataType::LargeBinary => Self::binary_datatype(),
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => Self::string_datatype(),
_ => {
unimplemented!("arrow data_type: {:?}", dt)
return error::UnsupportedArrowTypeSnafu {
arrow_type: dt.clone(),
}
.fail()
}
}
};
Ok(concrete_type)
}
}
@@ -122,6 +136,7 @@ impl_new_concrete_type_functions!(
Binary, String
);
/// Data type abstraction.
#[enum_dispatch::enum_dispatch]
pub trait DataType: std::fmt::Debug + Send + Sync {
/// Name of this data type.

View File

@@ -11,16 +11,25 @@ pub enum Error {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String, backtrace: Backtrace },
#[snafu(display("Bad array access, Index out of bounds: {}, size: {}", index, size))]
BadArrayAccess {
index: usize,
size: usize,
backtrace: Backtrace,
},
#[snafu(display("Unknown vector, {}", msg))]
UnknownVector { msg: String, backtrace: Backtrace },
#[snafu(display("Unsupported arrow data type, type: {:?}", arrow_type))]
UnsupportedArrowType {
arrow_type: arrow::datatypes::DataType,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {

View File

@@ -13,3 +13,5 @@ pub mod type_id;
pub mod types;
pub mod value;
pub mod vectors;
pub use arrow;

View File

View File

@@ -1,25 +1,161 @@
use std::collections::HashMap;
use std::sync::Arc;
use arrow::datatypes::Schema as ArrowSchema;
use arrow::datatypes::{Field, Schema as ArrowSchema};
#[derive(Debug, Clone, Eq, PartialEq)]
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{Error, Result};
// TODO(yingwen): consider assign a version to schema so compare schema can be
// done by compare version.
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnSchema {
pub name: String,
pub data_type: ConcreteDataType,
pub is_nullable: bool,
}
impl ColumnSchema {
pub fn new<T: Into<String>>(
name: T,
data_type: ConcreteDataType,
is_nullable: bool,
) -> ColumnSchema {
ColumnSchema {
name: name.into(),
data_type,
is_nullable,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Schema {
column_schemas: Vec<ColumnSchema>,
name_to_index: HashMap<String, usize>,
arrow_schema: Arc<ArrowSchema>,
}
impl Schema {
pub fn new(arrow_schema: Arc<ArrowSchema>) -> Self {
Self { arrow_schema }
pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
let mut fields = Vec::with_capacity(column_schemas.len());
let mut name_to_index = HashMap::with_capacity(column_schemas.len());
for (index, column_schema) in column_schemas.iter().enumerate() {
let field = Field::from(column_schema);
fields.push(field);
name_to_index.insert(column_schema.name.clone(), index);
}
let arrow_schema = Arc::new(ArrowSchema::from(fields));
Schema {
column_schemas,
name_to_index,
arrow_schema,
}
}
pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
&self.arrow_schema
}
pub fn column_schemas(&self) -> &[ColumnSchema] {
&self.column_schemas
}
pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> {
self.name_to_index
.get(name)
.map(|index| &self.column_schemas[*index])
}
}
pub type SchemaRef = Arc<Schema>;
impl From<Arc<ArrowSchema>> for Schema {
fn from(s: Arc<ArrowSchema>) -> Schema {
Schema::new(s)
impl TryFrom<&Field> for ColumnSchema {
type Error = Error;
fn try_from(field: &Field) -> Result<ColumnSchema> {
let data_type = ConcreteDataType::try_from(&field.data_type)?;
Ok(ColumnSchema {
name: field.name.clone(),
data_type,
is_nullable: field.is_nullable,
})
}
}
impl From<&ColumnSchema> for Field {
fn from(column_schema: &ColumnSchema) -> Field {
Field::new(
column_schema.name.clone(),
column_schema.data_type.as_arrow_type(),
column_schema.is_nullable,
)
}
}
impl TryFrom<Arc<ArrowSchema>> for Schema {
type Error = Error;
fn try_from(arrow_schema: Arc<ArrowSchema>) -> Result<Schema> {
let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len());
let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
for field in &arrow_schema.fields {
let column_schema = ColumnSchema::try_from(field)?;
name_to_index.insert(field.name.clone(), column_schemas.len());
column_schemas.push(column_schema);
}
Ok(Self {
column_schemas,
name_to_index,
arrow_schema,
})
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::DataType as ArrowDataType;
use super::*;
#[test]
fn test_column_schema() {
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
let field = Field::from(&column_schema);
assert_eq!("test", field.name);
assert_eq!(ArrowDataType::Int32, field.data_type);
assert!(field.is_nullable);
let new_column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!(column_schema, new_column_schema);
}
#[test]
fn test_schema() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true),
];
let schema = Schema::new(column_schemas.clone());
for column_schema in &column_schemas {
let found = schema.column_schema_by_name(&column_schema.name).unwrap();
assert_eq!(column_schema, found);
}
assert!(schema.column_schema_by_name("col3").is_none());
let fields: Vec<_> = column_schemas.iter().map(Field::from).collect();
let arrow_schema = Arc::new(ArrowSchema::from(fields));
let new_schema = Schema::try_from(arrow_schema.clone()).unwrap();
assert_eq!(schema, new_schema);
assert_eq!(column_schemas, schema.column_schemas());
assert_eq!(arrow_schema, *schema.arrow_schema());
assert_eq!(arrow_schema, *new_schema.arrow_schema());
}
}

View File

@@ -1,5 +1,7 @@
use crate::data_type::ConcreteDataType;
/// Unique identifier for logical data type.
#[derive(Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum LogicalTypeId {
Null,
@@ -28,3 +30,29 @@ pub enum LogicalTypeId {
/// seconds/milliseconds/microseconds/nanoseconds, determined by precision.
DateTime,
}
impl LogicalTypeId {
/// # Panics
/// Panics if data type is not supported.
pub fn data_type(&self) -> ConcreteDataType {
match self {
LogicalTypeId::Null => ConcreteDataType::null_datatype(),
LogicalTypeId::Boolean => ConcreteDataType::boolean_datatype(),
LogicalTypeId::Int8 => ConcreteDataType::int8_datatype(),
LogicalTypeId::Int16 => ConcreteDataType::int16_datatype(),
LogicalTypeId::Int32 => ConcreteDataType::int32_datatype(),
LogicalTypeId::Int64 => ConcreteDataType::int64_datatype(),
LogicalTypeId::UInt8 => ConcreteDataType::uint8_datatype(),
LogicalTypeId::UInt16 => ConcreteDataType::uint16_datatype(),
LogicalTypeId::UInt32 => ConcreteDataType::uint32_datatype(),
LogicalTypeId::UInt64 => ConcreteDataType::uint64_datatype(),
LogicalTypeId::Float32 => ConcreteDataType::float32_datatype(),
LogicalTypeId::Float64 => ConcreteDataType::float64_datatype(),
LogicalTypeId::String => ConcreteDataType::string_datatype(),
LogicalTypeId::Binary => ConcreteDataType::binary_datatype(),
LogicalTypeId::Date | LogicalTypeId::DateTime => {
unimplemented!("Data type for {:?} is unimplemented", self)
}
}
}
}

View File

@@ -1,8 +1,16 @@
use common_base::bytes::{Bytes, StringBytes};
use ordered_float::OrderedFloat;
use serde::{Serialize, Serializer};
pub type OrderedF32 = OrderedFloat<f32>;
pub type OrderedF64 = OrderedFloat<f64>;
/// Value holds a single arbitrary value of any [DataType](crate::data_type::DataType).
#[derive(Debug, PartialEq, Clone)]
///
/// Although compare Value with different data type is allowed, it is recommended to only
/// compare Value with same data type. Comparing Value with different data type may not
/// behaves as what you expect.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum Value {
Null,
@@ -16,8 +24,8 @@ pub enum Value {
Int16(i16),
Int32(i32),
Int64(i64),
Float32(f32),
Float64(f64),
Float32(OrderedF32),
Float64(OrderedF64),
// String types:
String(StringBytes),
@@ -32,14 +40,14 @@ macro_rules! impl_from {
($Variant:ident, $Type:ident) => {
impl From<$Type> for Value {
fn from(value: $Type) -> Self {
Value::$Variant(value)
Value::$Variant(value.into())
}
}
impl From<Option<$Type>> for Value {
fn from(value: Option<$Type>) -> Self {
match value {
Some(v) => Value::$Variant(v),
Some(v) => Value::$Variant(v.into()),
None => Value::Null,
}
}
@@ -110,3 +118,89 @@ impl Serialize for Value {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_value_from_inner() {
assert_eq!(Value::Boolean(true), Value::from(true));
assert_eq!(Value::Boolean(false), Value::from(false));
assert_eq!(Value::UInt8(u8::MIN), Value::from(u8::MIN));
assert_eq!(Value::UInt8(u8::MAX), Value::from(u8::MAX));
assert_eq!(Value::UInt16(u16::MIN), Value::from(u16::MIN));
assert_eq!(Value::UInt16(u16::MAX), Value::from(u16::MAX));
assert_eq!(Value::UInt32(u32::MIN), Value::from(u32::MIN));
assert_eq!(Value::UInt32(u32::MAX), Value::from(u32::MAX));
assert_eq!(Value::UInt64(u64::MIN), Value::from(u64::MIN));
assert_eq!(Value::UInt64(u64::MAX), Value::from(u64::MAX));
assert_eq!(Value::Int8(i8::MIN), Value::from(i8::MIN));
assert_eq!(Value::Int8(i8::MAX), Value::from(i8::MAX));
assert_eq!(Value::Int16(i16::MIN), Value::from(i16::MIN));
assert_eq!(Value::Int16(i16::MAX), Value::from(i16::MAX));
assert_eq!(Value::Int32(i32::MIN), Value::from(i32::MIN));
assert_eq!(Value::Int32(i32::MAX), Value::from(i32::MAX));
assert_eq!(Value::Int64(i64::MIN), Value::from(i64::MIN));
assert_eq!(Value::Int64(i64::MAX), Value::from(i64::MAX));
assert_eq!(
Value::Float32(OrderedFloat(f32::MIN)),
Value::from(f32::MIN)
);
assert_eq!(
Value::Float32(OrderedFloat(f32::MAX)),
Value::from(f32::MAX)
);
assert_eq!(
Value::Float64(OrderedFloat(f64::MIN)),
Value::from(f64::MIN)
);
assert_eq!(
Value::Float64(OrderedFloat(f64::MAX)),
Value::from(f64::MAX)
);
let string_bytes = StringBytes::from("hello");
assert_eq!(
Value::String(string_bytes.clone()),
Value::from(string_bytes)
);
let bytes = Bytes::from(b"world".as_slice());
assert_eq!(Value::Binary(bytes.clone()), Value::from(bytes));
}
#[test]
fn test_value_from_string() {
let hello = "hello".to_string();
assert_eq!(
Value::String(StringBytes::from(hello.clone())),
Value::from(hello)
);
let world = "world";
assert_eq!(Value::String(StringBytes::from(world)), Value::from(world));
}
#[test]
fn test_value_from_bytes() {
let hello = b"hello".to_vec();
assert_eq!(
Value::Binary(Bytes::from(hello.clone())),
Value::from(hello)
);
let world: &[u8] = b"world";
assert_eq!(Value::Binary(Bytes::from(world)), Value::from(world));
}
}

View File

@@ -104,11 +104,15 @@ pub trait Vector: Send + Sync + Serializable {
fn slice(&self, offset: usize, length: usize) -> VectorRef;
/// # Safety
/// Assumes that the `index` is smaller than size.
fn get_unchecked(&self, index: usize) -> Value;
/// Returns the clone of value at `index`.
///
/// # Panics
/// Panic if `index` is out of bound.
fn get(&self, index: usize) -> Value;
fn get(&self, index: usize) -> Result<Value> {
/// Returns the clone of value at `index` or error if `index`
/// is out of bound.
fn try_get(&self, index: usize) -> Result<Value> {
ensure!(
index < self.len(),
BadArrayAccessSnafu {
@@ -116,7 +120,7 @@ pub trait Vector: Send + Sync + Serializable {
size: self.len()
}
);
Ok(self.get_unchecked(index))
Ok(self.get(index))
}
// Copies each element according offsets parameter.
@@ -147,7 +151,29 @@ macro_rules! impl_try_from_arrow_array_for_vector {
};
}
pub(crate) use impl_try_from_arrow_array_for_vector;
macro_rules! impl_validity_for_vector {
($array: expr) => {
match $array.validity() {
Some(bitmap) => Validity::Slots(bitmap),
None => Validity::AllValid,
}
};
}
macro_rules! impl_get_for_vector {
($array: expr, $index: ident) => {
if $array.is_valid($index) {
// Safety: The index have been checked by `is_valid()`.
unsafe { $array.value_unchecked($index).into() }
} else {
Value::Null
}
};
}
pub(crate) use {
impl_get_for_vector, impl_try_from_arrow_array_for_vector, impl_validity_for_vector,
};
#[cfg(test)]
pub mod tests {

View File

@@ -14,8 +14,7 @@ use crate::error::SerializeSnafu;
use crate::scalars::{common, ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::value::Value;
use crate::vectors::impl_try_from_arrow_array_for_vector;
use crate::vectors::{MutableVector, Validity, Vector, VectorRef};
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
/// Vector of binary strings.
#[derive(Debug)]
@@ -59,10 +58,7 @@ impl Vector for BinaryVector {
}
fn validity(&self) -> Validity {
match self.array.validity() {
Some(bitmap) => Validity::Slots(bitmap),
None => Validity::AllValid,
}
vectors::impl_validity_for_vector!(self.array)
}
fn is_null(&self, row: usize) -> bool {
@@ -73,8 +69,8 @@ impl Vector for BinaryVector {
Arc::new(Self::from(self.array.slice(offset, length)))
}
fn get_unchecked(&self, index: usize) -> Value {
self.array.value(index).into()
fn get(&self, index: usize) -> Value {
vectors::impl_get_for_vector!(self.array, index)
}
fn replicate(&self, offsets: &[usize]) -> VectorRef {
@@ -159,7 +155,7 @@ impl Serializable for BinaryVector {
}
}
impl_try_from_arrow_array_for_vector!(LargeBinaryArray, BinaryVector);
vectors::impl_try_from_arrow_array_for_vector!(LargeBinaryArray, BinaryVector);
#[cfg(test)]
mod tests {
@@ -186,10 +182,7 @@ mod tests {
for i in 0..2 {
assert!(!v.is_null(i));
assert_eq!(
Value::Binary(Bytes::from(vec![1, 2, 3])),
v.get_unchecked(i)
);
assert_eq!(Value::Binary(Bytes::from(vec![1, 2, 3])), v.get(i));
}
let arrow_arr = v.to_arrow_array();
@@ -246,6 +239,9 @@ mod tests {
assert_eq!(b"hello", vector.get_data(0).unwrap());
assert_eq!(None, vector.get_data(3));
assert_eq!(Value::Binary(b"hello".as_slice().into()), vector.get(0));
assert_eq!(Value::Null, vector.get(3));
let mut iter = vector.iter_data();
assert_eq!(b"hello", iter.next().unwrap().unwrap());
assert_eq!(b"happy", iter.next().unwrap().unwrap());

View File

@@ -13,8 +13,7 @@ use crate::scalars::common::replicate_scalar_vector;
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::value::Value;
use crate::vectors::impl_try_from_arrow_array_for_vector;
use crate::vectors::{MutableVector, Validity, Vector, VectorRef};
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
/// Vector of boolean.
#[derive(Debug)]
@@ -74,10 +73,7 @@ impl Vector for BooleanVector {
}
fn validity(&self) -> Validity {
match self.array.validity() {
Some(bitmap) => Validity::Slots(bitmap),
None => Validity::AllValid,
}
vectors::impl_validity_for_vector!(self.array)
}
fn is_null(&self, row: usize) -> bool {
@@ -88,8 +84,8 @@ impl Vector for BooleanVector {
Arc::new(Self::from(self.array.slice(offset, length)))
}
fn get_unchecked(&self, index: usize) -> Value {
self.array.value(index).into()
fn get(&self, index: usize) -> Value {
vectors::impl_get_for_vector!(self.array, index)
}
fn replicate(&self, offsets: &[usize]) -> VectorRef {
@@ -171,7 +167,7 @@ impl Serializable for BooleanVector {
}
}
impl_try_from_arrow_array_for_vector!(BooleanArray, BooleanVector);
vectors::impl_try_from_arrow_array_for_vector!(BooleanArray, BooleanVector);
#[cfg(test)]
mod tests {
@@ -193,7 +189,7 @@ mod tests {
for (i, b) in bools.iter().enumerate() {
assert!(!v.is_null(i));
assert_eq!(Value::Boolean(*b), v.get_unchecked(i));
assert_eq!(Value::Boolean(*b), v.get(i));
}
let arrow_arr = v.to_arrow_array();
@@ -268,6 +264,7 @@ mod tests {
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vector.get_data(i));
assert_eq!(Value::from(v), vector.get(i));
}
}

View File

@@ -81,8 +81,8 @@ impl Vector for ConstantVector {
})
}
fn get_unchecked(&self, _index: usize) -> Value {
self.vector.get_unchecked(0)
fn get(&self, _index: usize) -> Value {
self.vector.get(0)
}
fn replicate(&self, offsets: &[usize]) -> VectorRef {
@@ -100,7 +100,7 @@ impl fmt::Debug for ConstantVector {
write!(
f,
"ConstantVector([{:?}; {}])",
self.get(0).unwrap_or(Value::Null),
self.try_get(0).unwrap_or(Value::Null),
self.len()
)
}
@@ -108,7 +108,7 @@ impl fmt::Debug for ConstantVector {
impl Serializable for ConstantVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
std::iter::repeat(self.get(0)?)
std::iter::repeat(self.try_get(0)?)
.take(self.len())
.map(serde_json::to_value)
.collect::<serde_json::Result<_>>()
@@ -136,7 +136,7 @@ mod tests {
for i in 0..10 {
assert!(!c.is_null(i));
assert_eq!(Value::Int32(1), c.get_unchecked(i));
assert_eq!(Value::Int32(1), c.get(i));
}
let arrow_arr = c.to_arrow_array();

View File

@@ -62,10 +62,6 @@ impl Vector for NullVector {
true
}
fn get_unchecked(&self, _index: usize) -> Value {
Value::Null
}
fn only_null(&self) -> bool {
true
}
@@ -74,6 +70,11 @@ impl Vector for NullVector {
Arc::new(Self::new(length))
}
fn get(&self, _index: usize) -> Value {
// Skips bound check for null array.
Value::Null
}
fn replicate(&self, offsets: &[usize]) -> VectorRef {
debug_assert!(
offsets.len() == self.len(),
@@ -127,7 +128,7 @@ mod tests {
for i in 0..32 {
assert!(v.is_null(i));
assert_eq!(Value::Null, v.get_unchecked(i));
assert_eq!(Value::Null, v.get(i));
}
}

View File

@@ -16,7 +16,7 @@ use crate::scalars::{ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::types::{DataTypeBuilder, Primitive};
use crate::value::Value;
use crate::vectors::{MutableVector, Validity, Vector, VectorRef};
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
/// Vector for primitive data types.
#[derive(Debug)]
@@ -81,10 +81,7 @@ impl<T: Primitive + DataTypeBuilder> Vector for PrimitiveVector<T> {
}
fn validity(&self) -> Validity {
match self.array.validity() {
Some(bitmap) => Validity::Slots(bitmap),
None => Validity::AllValid,
}
vectors::impl_validity_for_vector!(self.array)
}
fn is_null(&self, row: usize) -> bool {
@@ -95,8 +92,8 @@ impl<T: Primitive + DataTypeBuilder> Vector for PrimitiveVector<T> {
Arc::new(Self::from(self.array.slice(offset, length)))
}
fn get_unchecked(&self, index: usize) -> Value {
self.array.value(index).into()
fn get(&self, index: usize) -> Value {
vectors::impl_get_for_vector!(self.array, index)
}
fn replicate(&self, offsets: &[usize]) -> VectorRef {
@@ -288,7 +285,7 @@ mod tests {
for i in 0..4 {
assert!(!v.is_null(i));
assert_eq!(Value::Int32(i as i32 + 1), v.get_unchecked(i));
assert_eq!(Value::Int32(i as i32 + 1), v.get(i));
}
let json_value = v.serialize_to_json().unwrap();
@@ -352,6 +349,7 @@ mod tests {
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vector.get_data(i));
assert_eq!(Value::from(v), vector.get(i));
}
let res: Vec<_> = vector.iter_data().collect();
@@ -388,7 +386,7 @@ mod tests {
assert_eq!(4, v.len());
for i in 0..4 {
assert_eq!(Value::Int32(i as i32 + 1), v.get_unchecked(i));
assert_eq!(Value::Int32(i as i32 + 1), v.get(i));
}
}
}

View File

@@ -10,12 +10,11 @@ use snafu::ResultExt;
use crate::arrow_array::{MutableStringArray, StringArray};
use crate::data_type::ConcreteDataType;
use crate::error::SerializeSnafu;
use crate::prelude::{MutableVector, ScalarVectorBuilder, Validity, Vector, VectorRef};
use crate::scalars::{common, ScalarVector};
use crate::scalars::{common, ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::types::StringType;
use crate::value::Value;
use crate::vectors::impl_try_from_arrow_array_for_vector;
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
/// String array wrapper
#[derive(Debug, Clone)]
@@ -91,10 +90,7 @@ impl Vector for StringVector {
}
fn validity(&self) -> Validity {
match self.array.validity() {
Some(bitmap) => Validity::Slots(bitmap),
None => Validity::AllValid,
}
vectors::impl_validity_for_vector!(self.array)
}
fn is_null(&self, row: usize) -> bool {
@@ -105,8 +101,8 @@ impl Vector for StringVector {
Arc::new(Self::from(self.array.slice(offset, length)))
}
fn get_unchecked(&self, index: usize) -> Value {
self.array.value(index).into()
fn get(&self, index: usize) -> Value {
vectors::impl_get_for_vector!(self.array, index)
}
fn replicate(&self, offsets: &[usize]) -> VectorRef {
@@ -191,7 +187,7 @@ impl Serializable for StringVector {
}
}
impl_try_from_arrow_array_for_vector!(StringArray, StringVector);
vectors::impl_try_from_arrow_array_for_vector!(StringArray, StringVector);
#[cfg(test)]
mod tests {
@@ -211,8 +207,8 @@ mod tests {
assert!(!v.only_null());
for (i, s) in strs.iter().enumerate() {
assert_eq!(Value::from(*s), v.get_unchecked(i));
assert_eq!(Value::from(*s), v.get(i).unwrap());
assert_eq!(Value::from(*s), v.get(i));
assert_eq!(Value::from(*s), v.try_get(i).unwrap());
}
let arrow_arr = v.to_arrow_array();
@@ -259,6 +255,13 @@ mod tests {
assert_eq!(None, vector.get_data(1));
assert_eq!(Some("world"), vector.get_data(2));
// Get out of bound
assert!(vector.try_get(3).is_err());
assert_eq!(Value::String("hello".into()), vector.get(0));
assert_eq!(Value::Null, vector.get(1));
assert_eq!(Value::String("world".into()), vector.get(2));
let mut iter = vector.iter_data();
assert_eq!("hello", iter.next().unwrap().unwrap());
assert_eq!(None, iter.next().unwrap());

View File

@@ -12,5 +12,5 @@ tokio = { version = "1.0", features = ["full"] }
[dev-dependencies]
anyhow = "1.0"
common-telemetry = { path = "../common/telemetry"}
common-telemetry = { path = "../common/telemetry" }
tempdir = "0.3"

View File

@@ -19,15 +19,11 @@ pub trait SchemaProvider: Sync + Send {
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, _name: String, _table: TableRef) -> Result<Option<TableRef>> {
todo!();
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>>;
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, _name: &str) -> Result<Option<TableRef>> {
todo!();
}
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>>;
/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.

View File

@@ -124,7 +124,12 @@ impl PhysicalPlanner for DatafusionQueryEngine {
})?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(physical_plan.schema().into()),
Arc::new(
physical_plan
.schema()
.try_into()
.context(error::ConvertSchemaSnafu)?,
),
physical_plan,
)))
}

View File

@@ -13,7 +13,7 @@ use datafusion::execution::runtime_env::RuntimeEnv;
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
Table,
TableRef,
};
use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider};
@@ -150,7 +150,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
let table = Arc::new(TableAdapter::new(table, self.runtime.clone())?);
match self.schema_provider.register_table(name, table)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
@@ -185,35 +185,46 @@ impl SchemaProvider for SchemaProviderAdapter {
self.df_schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
fn table(&self, name: &str) -> Option<TableRef> {
self.df_schema_provider.table(name).map(|table_provider| {
Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _
match table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
Some(adapter) => adapter.table(),
None => {
// TODO(yingwen): Avoid panic here.
let adapter = TableAdapter::new(table_provider, self.runtime.clone())
.expect("convert datafusion table");
Arc::new(adapter) as _
}
}
})
}
fn register_table(
&self,
name: String,
table: Arc<dyn Table>,
) -> Result<Option<Arc<dyn Table>>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
Ok(self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu {
msg: "Fail to register table to datafusion",
})?
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
.map(|_| table))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
Ok(self
.df_schema_provider
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
self.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})?
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
.map(|table| {
let adapter = TableAdapter::new(table, self.runtime.clone())
.context(error::ConvertTableSnafu)?;
Ok(Arc::new(adapter) as _)
})
.transpose()
}
fn table_exist(&self, name: &str) -> bool {

View File

@@ -32,6 +32,18 @@ pub enum InnerError {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Fail to convert arrow schema, source: {}", source))]
ConvertSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Fail to convert table, source: {}", source))]
ConvertTable {
#[snafu(backtrace)]
source: table::error::Error,
},
}
impl ErrorExt for InnerError {
@@ -42,7 +54,9 @@ impl ErrorExt for InnerError {
// TODO(yingwen): Further categorize datafusion error.
Datafusion { .. } => StatusCode::EngineExecuteQuery,
// This downcast should not fail in usual case.
PhysicalPlanDowncast { .. } => StatusCode::Unexpected,
PhysicalPlanDowncast { .. } | ConvertSchema { .. } | ConvertTable { .. } => {
StatusCode::Unexpected
}
ParseSql { source, .. } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
}

View File

@@ -97,7 +97,10 @@ impl PhysicalPlan for PhysicalPlanAdapter {
msg: "Fail to execute physical plan",
})?;
Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream)))
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
df_stream,
)))
}
fn as_any(&self) -> &dyn Any {

16
src/storage/Cargo.toml Normal file
View File

@@ -0,0 +1,16 @@
[package]
name = "storage"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arc-swap = "1.0"
async-trait = "0.1"
common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry" }
datatypes = { path = "../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
tokio = { version = "1.18", features = ["full"] }

134
src/storage/src/engine.rs Normal file
View File

@@ -0,0 +1,134 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_telemetry::logging::info;
use snafu::ResultExt;
use store_api::storage::{EngineContext, RegionDescriptor, StorageEngine};
use crate::error::{self, Error, Result};
use crate::region::RegionImpl;
/// [StorageEngine] implementation.
#[derive(Clone)]
pub struct EngineImpl {
inner: Arc<EngineInner>,
}
#[async_trait]
impl StorageEngine for EngineImpl {
type Error = Error;
type Region = RegionImpl;
async fn open_region(&self, _ctx: &EngineContext, _name: &str) -> Result<RegionImpl> {
unimplemented!()
}
async fn close_region(&self, _ctx: &EngineContext, _region: RegionImpl) -> Result<()> {
unimplemented!()
}
async fn create_region(
&self,
_ctx: &EngineContext,
descriptor: RegionDescriptor,
) -> Result<RegionImpl> {
self.inner.create_region(descriptor).await
}
async fn drop_region(&self, _ctx: &EngineContext, _region: RegionImpl) -> Result<()> {
unimplemented!()
}
fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result<Option<RegionImpl>> {
Ok(self.inner.get_region(name))
}
}
impl EngineImpl {
pub fn new() -> EngineImpl {
EngineImpl {
inner: Arc::new(EngineInner::default()),
}
}
}
impl Default for EngineImpl {
fn default() -> Self {
Self::new()
}
}
type RegionMap = HashMap<String, RegionImpl>;
#[derive(Default)]
struct EngineInner {
regions: RwLock<RegionMap>,
}
impl EngineInner {
async fn create_region(&self, descriptor: RegionDescriptor) -> Result<RegionImpl> {
{
let regions = self.regions.read().unwrap();
if let Some(region) = regions.get(&descriptor.name) {
return Ok(region.clone());
}
}
let region_name = descriptor.name.clone();
let metadata = descriptor
.try_into()
.context(error::InvalidRegionDescSnafu {
region: &region_name,
})?;
let region = RegionImpl::new(region_name.clone(), metadata);
{
let mut regions = self.regions.write().unwrap();
if let Some(region) = regions.get(&region_name) {
return Ok(region.clone());
}
regions.insert(region_name.clone(), region.clone());
}
// TODO(yingwen): Persist region metadata to log.
// TODO(yingwen): Impl Debug format for region and print region info briefly in log.
info!("Storage engine create region {}", region_name);
Ok(region)
}
fn get_region(&self, name: &str) -> Option<RegionImpl> {
self.regions.read().unwrap().get(name).cloned()
}
}
#[cfg(test)]
mod tests {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::Region;
use super::*;
use crate::test_util::descriptor_util::RegionDescBuilder;
#[tokio::test]
async fn test_create_new_region() {
let engine = EngineImpl::new();
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();
let ctx = EngineContext::default();
let region = engine.create_region(&ctx, desc).await.unwrap();
assert_eq!(region_name, region.name());
let region2 = engine.get_region(&ctx, region_name).unwrap().unwrap();
assert_eq!(region_name, region2.name());
assert!(engine.get_region(&ctx, "no such region").unwrap().is_none());
}
}

75
src/storage/src/error.rs Normal file
View File

@@ -0,0 +1,75 @@
use std::any::Any;
use common_error::prelude::*;
use crate::metadata::Error as MetadataError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("Invalid region descriptor, region: {}, source: {}", region, source))]
InvalidRegionDesc {
region: String,
#[snafu(backtrace)]
source: MetadataError,
},
#[snafu(display("Invalid schema of input data, region: {}", region))]
InvalidInputSchema {
region: String,
backtrace: Backtrace,
},
#[snafu(display("Missing column {} in write batch", column))]
BatchMissingColumn {
column: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
InvalidRegionDesc { .. } | InvalidInputSchema { .. } | BatchMissingColumn { .. } => {
StatusCode::InvalidArguments
}
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use snafu::GenerateImplicitData;
use super::*;
fn throw_metadata_error() -> std::result::Result<(), MetadataError> {
Err(MetadataError::CfIdExists {
id: 1,
backtrace: Backtrace::generate(),
})
}
#[test]
fn test_invalid_region_desc_error() {
let err = throw_metadata_error()
.context(InvalidRegionDescSnafu { region: "hello" })
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
}
}

17
src/storage/src/lib.rs Normal file
View File

@@ -0,0 +1,17 @@
//! Storage engine implementation.
mod engine;
mod error;
mod memtable;
pub mod metadata;
mod region;
mod region_writer;
mod snapshot;
pub mod sync;
mod version;
mod write_batch;
#[cfg(test)]
mod test_util;
pub use engine::EngineImpl;

View File

@@ -0,0 +1,99 @@
mod btree;
mod inserter;
mod schema;
use std::mem;
use std::sync::Arc;
use datatypes::vectors::VectorRef;
use store_api::storage::{SequenceNumber, ValueType};
use crate::error::Result;
use crate::memtable::btree::BTreeMemtable;
pub use crate::memtable::inserter::Inserter;
pub use crate::memtable::schema::MemtableSchema;
/// In memory storage.
pub trait Memtable: Send + Sync {
fn schema(&self) -> &MemtableSchema;
/// Write key/values to the memtable.
///
/// # Panics
/// Panic if the schema of key/value differs from memtable's schema.
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Returns the estimated bytes allocated by this memtable from heap.
fn bytes_allocated(&self) -> usize;
}
pub type MemtableRef = Arc<dyn Memtable>;
pub trait MemtableBuilder: Send + Sync {
fn build(&self, schema: MemtableSchema) -> MemtableRef;
}
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
// TODO(yingwen): Maybe use individual vector for timestamp and version.
/// Key-value pairs in columnar format.
pub struct KeyValues {
pub sequence: SequenceNumber,
pub value_type: ValueType,
/// Start index of these key-value paris in batch.
pub start_index_in_batch: usize,
pub keys: Vec<VectorRef>,
pub values: Vec<VectorRef>,
}
impl KeyValues {
// Note that `sequence` is not reset.
fn reset(&mut self, value_type: ValueType, index_in_batch: usize) {
self.value_type = value_type;
self.start_index_in_batch = index_in_batch;
self.keys.clear();
self.values.clear();
}
}
impl KeyValues {
pub fn len(&self) -> usize {
self.keys.first().map(|v| v.len()).unwrap_or_default()
}
}
pub struct DefaultMemtableBuilder {}
impl MemtableBuilder for DefaultMemtableBuilder {
fn build(&self, schema: MemtableSchema) -> MemtableRef {
Arc::new(BTreeMemtable::new(schema))
}
}
pub struct MemtableSet {
mem: MemtableRef,
// TODO(yingwen): Support multiple immutable memtables.
_immem: Option<MemtableRef>,
}
impl MemtableSet {
pub fn new(mem: MemtableRef) -> MemtableSet {
MemtableSet { mem, _immem: None }
}
pub fn mutable_memtable(&self) -> &MemtableRef {
&self.mem
}
/// Switch mutable memtable to immutable memtable, returns the old mutable memtable if success.
pub fn _switch_memtable(&mut self, mem: &MemtableRef) -> std::result::Result<MemtableRef, ()> {
match &self._immem {
Some(_) => Err(()),
None => {
let old_mem = mem::replace(&mut self.mem, mem.clone());
self._immem = Some(old_mem.clone());
Ok(old_mem)
}
}
}
}

View File

@@ -0,0 +1,136 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::RwLock;
use datatypes::value::Value;
use store_api::storage::{SequenceNumber, ValueType};
use crate::error::Result;
use crate::memtable::{KeyValues, Memtable, MemtableSchema};
/// A simple memtable implementation based on std's [`BTreeMap`].
///
/// Mainly for test purpose.
pub struct BTreeMemtable {
schema: MemtableSchema,
map: RwLock<BTreeMap<RowKey, RowValue>>,
}
impl BTreeMemtable {
pub fn new(schema: MemtableSchema) -> BTreeMemtable {
BTreeMemtable {
schema,
map: RwLock::new(BTreeMap::new()),
}
}
}
impl Memtable for BTreeMemtable {
fn schema(&self) -> &MemtableSchema {
&self.schema
}
fn write(&self, kvs: &KeyValues) -> Result<()> {
let mut map = self.map.write().unwrap();
let iter_row = IterRow::new(kvs);
for (row_key, row_value) in iter_row {
map.insert(row_key, row_value);
}
Ok(())
}
fn bytes_allocated(&self) -> usize {
unimplemented!()
}
}
struct IterRow<'a> {
kvs: &'a KeyValues,
index: usize,
len: usize,
}
impl<'a> IterRow<'a> {
fn new(kvs: &KeyValues) -> IterRow {
IterRow {
kvs,
index: 0,
len: kvs.len(),
}
}
fn fetch_row(&mut self) -> (RowKey, RowValue) {
let keys = self
.kvs
.keys
.iter()
.map(|vector| vector.get(self.index))
.collect();
let row_key = RowKey {
keys,
sequence: self.kvs.sequence,
index_in_batch: self.kvs.start_index_in_batch + self.index,
value_type: self.kvs.value_type,
};
let row_value = RowValue {
_values: self
.kvs
.values
.iter()
.map(|vector| vector.get(self.index))
.collect(),
};
(row_key, row_value)
}
}
impl<'a> Iterator for IterRow<'a> {
type Item = (RowKey, RowValue);
fn next(&mut self) -> Option<(RowKey, RowValue)> {
if self.index >= self.len {
return None;
}
Some(self.fetch_row())
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.kvs.keys.len(), Some(self.kvs.keys.len()))
}
}
// TODO(yingwen): Actually the version and timestamp may order desc.
#[derive(PartialEq, Eq)]
struct RowKey {
keys: Vec<Value>,
sequence: SequenceNumber,
index_in_batch: usize,
value_type: ValueType,
}
impl Ord for RowKey {
fn cmp(&self, other: &RowKey) -> Ordering {
// Order by (keys asc, sequence desc, index_in_batch desc, value type desc), though (key,
// sequence, index_in_batch) should be enough to disambiguate.
self.keys
.cmp(&other.keys)
.then_with(|| other.sequence.cmp(&self.sequence))
.then_with(|| other.index_in_batch.cmp(&self.index_in_batch))
.then_with(|| other.value_type.cmp(&self.value_type))
}
}
impl PartialOrd for RowKey {
fn partial_cmp(&self, other: &RowKey) -> Option<Ordering> {
Some(self.cmp(other))
}
}
struct RowValue {
_values: Vec<Value>,
}

View File

@@ -0,0 +1,102 @@
use std::sync::Arc;
use datatypes::vectors::{NullVector, VectorRef};
use snafu::ensure;
use store_api::storage::{ColumnDescriptor, SequenceNumber, ValueType};
use crate::error::{self, Result};
use crate::memtable::{KeyValues, Memtable};
use crate::write_batch::{Mutation, PutData, WriteBatch};
/// Wraps logic of inserting key/values in [WriteBatch] to [Memtable].
pub struct Inserter {
/// Sequence of the batch to be inserted.
sequence: SequenceNumber,
index_in_batch: usize,
}
impl Inserter {
pub fn new(sequence: SequenceNumber) -> Inserter {
Inserter {
sequence,
index_in_batch: 0,
}
}
// TODO(yingwen): Can we take the WriteBatch?
/// Insert write batch into memtable.
///
/// Won't do schema validation.
pub fn insert_memtable(&mut self, batch: &WriteBatch, memtable: &dyn Memtable) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
let schema = memtable.schema();
// Reusable KeyValues buffer.
let mut kvs = KeyValues {
sequence: self.sequence,
value_type: ValueType::Put,
start_index_in_batch: self.index_in_batch,
keys: Vec::with_capacity(schema.num_row_key_columns()),
values: Vec::with_capacity(schema.num_value_columns()),
};
for mutation in batch {
match mutation {
Mutation::Put(put_data) => {
self.put_impl(put_data, memtable, &mut kvs)?;
}
}
}
Ok(())
}
fn put_impl(
&mut self,
put_data: &PutData,
memtable: &dyn Memtable,
kvs: &mut KeyValues,
) -> Result<()> {
let schema = memtable.schema();
let num_rows = put_data.num_rows();
kvs.reset(ValueType::Put, self.index_in_batch);
for key_col in schema.row_key_columns() {
clone_put_data_column_to(put_data, &key_col.desc, &mut kvs.keys)?;
}
for value_col in schema.value_columns() {
clone_put_data_column_to(put_data, &value_col.desc, &mut kvs.values)?;
}
memtable.write(kvs)?;
self.index_in_batch += num_rows;
Ok(())
}
}
fn clone_put_data_column_to(
put_data: &PutData,
desc: &ColumnDescriptor,
target: &mut Vec<VectorRef>,
) -> Result<()> {
if let Some(vector) = put_data.column_by_name(&desc.name) {
target.push(vector.clone());
} else {
// The write batch should have been validated before.
ensure!(
desc.is_nullable,
error::BatchMissingColumnSnafu { column: &desc.name }
);
let num_rows = put_data.num_rows();
target.push(Arc::new(NullVector::new(num_rows)));
}
Ok(())
}

View File

@@ -0,0 +1,31 @@
use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef};
pub struct MemtableSchema {
columns_row_key: ColumnsRowKeyMetadataRef,
}
impl MemtableSchema {
pub fn new(columns_row_key: ColumnsRowKeyMetadataRef) -> MemtableSchema {
MemtableSchema { columns_row_key }
}
#[inline]
pub fn row_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.columns_row_key.iter_row_key_columns()
}
#[inline]
pub fn value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.columns_row_key.iter_value_columns()
}
#[inline]
pub fn num_row_key_columns(&self) -> usize {
self.columns_row_key.num_row_key_columns()
}
#[inline]
pub fn num_value_columns(&self) -> usize {
self.columns_row_key.num_value_columns()
}
}

460
src/storage/src/metadata.rs Normal file
View File

@@ -0,0 +1,460 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_error::prelude::*;
use datatypes::data_type::ConcreteDataType;
use snafu::ensure;
use store_api::storage::{
consts, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId,
ColumnId, ColumnSchema, RegionDescriptor, RegionMeta, RowKeyDescriptor, Schema, SchemaRef,
};
/// Error for handling metadata.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Column name already exists, name: {}", name))]
ColNameExists { name: String, backtrace: Backtrace },
#[snafu(display("Column family name already exists, name: {}", name))]
CfNameExists { name: String, backtrace: Backtrace },
#[snafu(display("Column family id already exists, id: {}", id))]
CfIdExists { id: ColumnId, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
/// Implementation of [RegionMeta].
///
/// Holds a snapshot of region metadata.
pub struct RegionMetaImpl {
metadata: RegionMetadataRef,
}
impl RegionMetaImpl {
pub fn new(metadata: RegionMetadataRef) -> RegionMetaImpl {
RegionMetaImpl { metadata }
}
}
impl RegionMeta for RegionMetaImpl {
fn schema(&self) -> &SchemaRef {
&self.metadata.schema
}
}
pub type VersionNumber = u32;
// TODO(yingwen): Make some fields of metadata private.
/// In memory metadata of region.
#[derive(Clone)]
pub struct RegionMetadata {
/// Schema of the region.
///
/// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef`
/// conveniently. The fields order in `SchemaRef` **must** be consistent with
/// columns order in [ColumnsMetadata] to ensure the projection index of a field
/// is correct.
pub schema: SchemaRef,
pub columns_row_key: ColumnsRowKeyMetadataRef,
pub column_families: ColumnFamiliesMetadata,
/// Version of the metadata. Version is set to zero initially and bumped once the
/// metadata have been altered.
pub version: VersionNumber,
}
pub type RegionMetadataRef = Arc<RegionMetadata>;
#[derive(Clone)]
pub struct ColumnMetadata {
pub cf_id: ColumnFamilyId,
pub desc: ColumnDescriptor,
}
#[derive(Clone)]
pub struct ColumnsMetadata {
/// All columns, in `(key columns, timestamp, [version,] value columns)` order.
///
/// Columns order should be consistent with fields order in [SchemaRef].
pub columns: Vec<ColumnMetadata>,
/// Maps column name to index of columns, used to fast lookup column by name.
pub name_to_col_index: HashMap<String, usize>,
}
#[derive(Default, Clone)]
pub struct RowKeyMetadata {
/// Exclusive end index of row key columns.
row_key_end: usize,
/// Index of timestamp key column.
pub timestamp_key_index: usize,
/// If version column is enabled, then the last column of key columns is a
/// version column.
pub enable_version_column: bool,
}
#[derive(Clone)]
pub struct ColumnsRowKeyMetadata {
columns: ColumnsMetadata,
row_key: RowKeyMetadata,
}
impl ColumnsRowKeyMetadata {
pub fn iter_row_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.columns.columns.iter().take(self.row_key.row_key_end)
}
pub fn iter_value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.columns.columns.iter().skip(self.row_key.row_key_end)
}
#[inline]
pub fn num_row_key_columns(&self) -> usize {
self.row_key.row_key_end
}
#[inline]
pub fn num_value_columns(&self) -> usize {
self.columns.columns.len() - self.row_key.row_key_end
}
}
pub type ColumnsRowKeyMetadataRef = Arc<ColumnsRowKeyMetadata>;
#[derive(Clone)]
pub struct ColumnFamiliesMetadata {
/// Map column family id to column family metadata.
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
}
impl ColumnFamiliesMetadata {
pub fn cf_by_id(&self, cf_id: ColumnFamilyId) -> Option<&ColumnFamilyMetadata> {
self.id_to_cfs.get(&cf_id)
}
}
#[derive(Clone)]
pub struct ColumnFamilyMetadata {
/// Column family name.
pub name: String,
pub cf_id: ColumnFamilyId,
/// Inclusive start index of columns in the column family.
pub column_index_start: usize,
/// Exclusive end index of columns in the column family.
pub column_index_end: usize,
}
impl TryFrom<RegionDescriptor> for RegionMetadata {
type Error = Error;
fn try_from(desc: RegionDescriptor) -> Result<RegionMetadata> {
// Doesn't set version explicitly here, because this is a new region meta
// created from descriptor, using initial version is reasonable.
let mut builder = RegionMetadataBuilder::new()
.row_key(desc.row_key)?
.add_column_family(desc.default_cf)?;
for cf in desc.extra_cfs {
builder = builder.add_column_family(cf)?;
}
Ok(builder.build())
}
}
#[derive(Default)]
struct RegionMetadataBuilder {
columns: Vec<ColumnMetadata>,
column_schemas: Vec<ColumnSchema>,
name_to_col_index: HashMap<String, usize>,
row_key: RowKeyMetadata,
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
cf_names: HashSet<String>,
}
impl RegionMetadataBuilder {
fn new() -> RegionMetadataBuilder {
RegionMetadataBuilder::default()
}
fn row_key(mut self, key: RowKeyDescriptor) -> Result<Self> {
for col in key.columns {
self.push_row_key_column(col)?;
}
// TODO(yingwen): Validate this is a timestamp column.
let timestamp_key_index = self.columns.len();
self.push_row_key_column(key.timestamp)?;
if key.enable_version_column {
// TODO(yingwen): Validate that version column must be uint64 column.
let version_col = version_column_desc();
self.push_row_key_column(version_col)?;
}
let row_key_end = self.columns.len();
self.row_key = RowKeyMetadata {
row_key_end,
timestamp_key_index,
enable_version_column: key.enable_version_column,
};
Ok(self)
}
fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
ensure!(
!self.id_to_cfs.contains_key(&cf.cf_id),
CfIdExistsSnafu { id: cf.cf_id }
);
ensure!(
!self.cf_names.contains(&cf.name),
CfNameExistsSnafu { name: &cf.name }
);
let column_index_start = self.columns.len();
let column_index_end = column_index_start + cf.columns.len();
for col in cf.columns {
self.push_value_column(cf.cf_id, col)?;
}
let cf_meta = ColumnFamilyMetadata {
name: cf.name.clone(),
cf_id: cf.cf_id,
column_index_start,
column_index_end,
};
self.id_to_cfs.insert(cf.cf_id, cf_meta);
self.cf_names.insert(cf.name);
Ok(self)
}
fn build(self) -> RegionMetadata {
let schema = Arc::new(Schema::new(self.column_schemas));
let columns = ColumnsMetadata {
columns: self.columns,
name_to_col_index: self.name_to_col_index,
};
let columns_row_key = Arc::new(ColumnsRowKeyMetadata {
columns,
row_key: self.row_key,
});
RegionMetadata {
schema,
columns_row_key,
column_families: ColumnFamiliesMetadata {
id_to_cfs: self.id_to_cfs,
},
version: 0,
}
}
// Helper methods:
fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<()> {
self.push_value_column(consts::KEY_CF_ID, desc)
}
fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> {
ensure!(
!self.name_to_col_index.contains_key(&desc.name),
ColNameExistsSnafu { name: &desc.name }
);
let column_schema = ColumnSchema::from(&desc);
let column_name = desc.name.clone();
let meta = ColumnMetadata { cf_id, desc };
// TODO(yingwen): Store cf_id to metadata in field.
let column_index = self.columns.len();
self.columns.push(meta);
self.column_schemas.push(column_schema);
self.name_to_col_index.insert(column_name, column_index);
Ok(())
}
}
fn version_column_desc() -> ColumnDescriptor {
ColumnDescriptorBuilder::new(
consts::VERSION_COLUMN_ID,
consts::VERSION_COLUMN_NAME.to_string(),
ConcreteDataType::uint64_datatype(),
)
.is_nullable(false)
.build()
}
// TODO(yingwen): Add tests for using invalid row_key/cf to build metadata.
#[cfg(test)]
mod tests {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, RowKeyDescriptorBuilder,
};
use super::*;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::schema_util;
#[test]
fn test_descriptor_to_region_metadata() {
let desc = RegionDescBuilder::new("region-0")
.timestamp(("ts", LogicalTypeId::UInt64, false))
.enable_version_column(false)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();
let expect_schema = schema_util::new_schema_ref(&[
("k1", LogicalTypeId::Int32, false),
("ts", LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Float32, true),
]);
let metadata = RegionMetadata::try_from(desc).unwrap();
assert_eq!(expect_schema, metadata.schema);
assert_eq!(2, metadata.columns_row_key.num_row_key_columns());
assert_eq!(1, metadata.columns_row_key.num_value_columns());
}
#[test]
fn test_build_empty_region_metadata() {
let metadata = RegionMetadataBuilder::default().build();
assert!(metadata.schema.column_schemas().is_empty());
assert!(metadata.columns_row_key.columns.columns.is_empty());
assert_eq!(0, metadata.columns_row_key.num_row_key_columns());
assert!(metadata
.columns_row_key
.iter_row_key_columns()
.next()
.is_none());
assert_eq!(0, metadata.columns_row_key.num_value_columns());
assert!(metadata
.columns_row_key
.iter_value_columns()
.next()
.is_none());
assert!(metadata.column_families.id_to_cfs.is_empty());
assert_eq!(0, metadata.version);
}
fn new_metadata(enable_version_column: bool) -> RegionMetadata {
let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype())
.is_nullable(false)
.build();
let row_key = RowKeyDescriptorBuilder::new(timestamp)
.push_column(
ColumnDescriptorBuilder::new(3, "k1", ConcreteDataType::int64_datatype())
.is_nullable(false)
.build(),
)
.enable_version_column(enable_version_column)
.build();
let cf = ColumnFamilyDescriptorBuilder::new()
.push_column(
ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype()).build(),
)
.build();
RegionMetadataBuilder::new()
.row_key(row_key)
.unwrap()
.add_column_family(cf)
.unwrap()
.build()
}
#[test]
fn test_build_metedata_disable_version() {
let metadata = new_metadata(false);
let expect_schema = schema_util::new_schema_ref(&[
("k1", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Int64, false),
("v1", LogicalTypeId::Int64, true),
]);
assert_eq!(expect_schema, metadata.schema);
// 3 columns
assert_eq!(3, metadata.columns_row_key.columns.columns.len());
// 2 row key columns
assert_eq!(2, metadata.columns_row_key.num_row_key_columns());
let row_key_names: Vec<_> = metadata
.columns_row_key
.iter_row_key_columns()
.map(|column| &column.desc.name)
.collect();
assert_eq!(["k1", "ts"], &row_key_names[..]);
// 1 value column
assert_eq!(1, metadata.columns_row_key.num_value_columns());
let value_names: Vec<_> = metadata
.columns_row_key
.iter_value_columns()
.map(|column| &column.desc.name)
.collect();
assert_eq!(["v1"], &value_names[..]);
// Check timestamp index.
assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index);
// Check version column.
assert!(!metadata.columns_row_key.row_key.enable_version_column);
assert!(metadata
.column_families
.cf_by_id(consts::DEFAULT_CF_ID)
.is_some());
assert_eq!(0, metadata.version);
}
#[test]
fn test_build_metedata_enable_version() {
let metadata = new_metadata(true);
let expect_schema = schema_util::new_schema_ref(&[
("k1", LogicalTypeId::Int64, false),
("ts", LogicalTypeId::Int64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Int64, true),
]);
assert_eq!(expect_schema, metadata.schema);
// 4 columns
assert_eq!(4, metadata.columns_row_key.columns.columns.len());
// 3 row key columns
assert_eq!(3, metadata.columns_row_key.num_row_key_columns());
let row_key_names: Vec<_> = metadata
.columns_row_key
.iter_row_key_columns()
.map(|column| &column.desc.name)
.collect();
assert_eq!(
["k1", "ts", consts::VERSION_COLUMN_NAME],
&row_key_names[..]
);
// 1 value column
assert_eq!(1, metadata.columns_row_key.num_value_columns());
let value_names: Vec<_> = metadata
.columns_row_key
.iter_value_columns()
.map(|column| &column.desc.name)
.collect();
assert_eq!(["v1"], &value_names[..]);
// Check timestamp index.
assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index);
// Check version column.
assert!(metadata.columns_row_key.row_key.enable_version_column);
}
}

122
src/storage/src/region.rs Normal file
View File

@@ -0,0 +1,122 @@
use std::sync::Arc;
use async_trait::async_trait;
use snafu::ensure;
use store_api::storage::{ReadContext, Region, RegionMeta, WriteContext, WriteResponse};
use tokio::sync::Mutex;
use crate::error::{self, Error, Result};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableSchema, MemtableSet};
use crate::metadata::{RegionMetaImpl, RegionMetadata};
use crate::region_writer::RegionWriter;
use crate::snapshot::SnapshotImpl;
use crate::version::{VersionControl, VersionControlRef};
use crate::write_batch::WriteBatch;
/// [Region] implementation.
#[derive(Clone)]
pub struct RegionImpl {
inner: Arc<RegionInner>,
}
#[async_trait]
impl Region for RegionImpl {
type Error = Error;
type Meta = RegionMetaImpl;
type WriteRequest = WriteBatch;
type Snapshot = SnapshotImpl;
fn name(&self) -> &str {
&self.inner.name
}
fn in_memory_metadata(&self) -> RegionMetaImpl {
self.inner.in_memory_metadata()
}
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
self.inner.write(ctx, request).await
}
fn snapshot(&self, _ctx: &ReadContext) -> Result<SnapshotImpl> {
unimplemented!()
}
}
impl RegionImpl {
pub fn new(name: String, metadata: RegionMetadata) -> RegionImpl {
let memtable_builder = Arc::new(DefaultMemtableBuilder {});
let memtable_schema = MemtableSchema::new(metadata.columns_row_key.clone());
let mem = memtable_builder.build(memtable_schema);
let memtables = MemtableSet::new(mem);
let version = VersionControl::new(metadata, memtables);
let inner = Arc::new(RegionInner {
name,
version: Arc::new(version),
writer: Mutex::new(RegionWriter::new(memtable_builder)),
});
RegionImpl { inner }
}
}
struct RegionInner {
name: String,
version: VersionControlRef,
writer: Mutex<RegionWriter>,
}
impl RegionInner {
fn in_memory_metadata(&self) -> RegionMetaImpl {
let metadata = self.version.metadata();
RegionMetaImpl::new(metadata)
}
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
let metadata = self.in_memory_metadata();
let schema = metadata.schema();
// Only compare column schemas.
ensure!(
schema.column_schemas() == request.schema().column_schemas(),
error::InvalidInputSchemaSnafu { region: &self.name }
);
// Now altering schema is not allowed, so it is safe to validate schema outside of the lock.
let mut writer = self.writer.lock().await;
writer.write(ctx, &self.version, request).await
}
}
#[cfg(test)]
mod tests {
use datatypes::type_id::LogicalTypeId;
use store_api::storage::consts;
use super::*;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::schema_util;
#[test]
fn test_new_region() {
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v1", LogicalTypeId::Float32, true))
.build();
let metadata = desc.try_into().unwrap();
let region = RegionImpl::new(region_name.to_string(), metadata);
let expect_schema = schema_util::new_schema_ref(&[
("k1", LogicalTypeId::Int32, false),
("timestamp", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Float32, true),
]);
assert_eq!(region_name, region.name());
assert_eq!(expect_schema, *region.in_memory_metadata().schema());
}
}

View File

@@ -0,0 +1,43 @@
use store_api::storage::{SequenceNumber, WriteContext, WriteResponse};
use crate::error::Result;
use crate::memtable::{Inserter, MemtableBuilderRef};
use crate::version::VersionControlRef;
use crate::write_batch::WriteBatch;
pub struct RegionWriter {
_memtable_builder: MemtableBuilderRef,
last_sequence: SequenceNumber,
}
impl RegionWriter {
pub fn new(_memtable_builder: MemtableBuilderRef) -> RegionWriter {
RegionWriter {
_memtable_builder,
last_sequence: 0,
}
}
// TODO(yingwen): Support group commit so we can avoid taking mutable reference.
/// Write `WriteBatch` to region, now the schema of batch needs to be validated outside.
pub async fn write(
&mut self,
_ctx: &WriteContext,
version_control: &VersionControlRef,
request: WriteBatch,
) -> Result<WriteResponse> {
// Mutable reference of writer ensure no other reference of this writer can modify
// the version control (write is exclusive).
// TODO(yingwen): Write wal and get sequence.
let version = version_control.current();
let memtables = &version.memtables;
let mem = memtables.mutable_memtable();
self.last_sequence += 1;
let mut inserter = Inserter::new(self.last_sequence);
inserter.insert_memtable(&request, &**mem)?;
Ok(WriteResponse {})
}
}

View File

@@ -0,0 +1,26 @@
use async_trait::async_trait;
use store_api::storage::{
GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, Snapshot,
};
use crate::error::{Error, Result};
/// [Snapshot] implementation.
pub struct SnapshotImpl {}
#[async_trait]
impl Snapshot for SnapshotImpl {
type Error = Error;
fn schema(&self) -> &SchemaRef {
unimplemented!()
}
async fn scan(&self, _ctx: &ReadContext, _request: ScanRequest) -> Result<ScanResponse> {
unimplemented!()
}
async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result<GetResponse> {
unimplemented!()
}
}

125
src/storage/src/sync.rs Normal file
View File

@@ -0,0 +1,125 @@
//! Synchronization utilities
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, MutexGuard};
use arc_swap::ArcSwap;
/// A thread safe clone-on-write cell.
///
/// Each read returns a read only clone of the internal data and won't block
/// write. Write to the cell data needs to acquire a lock txn first and
/// modifications are not visible to others until the txn is committed.
#[derive(Debug)]
pub struct CowCell<T> {
inner: ArcSwap<T>,
mutex: Mutex<()>,
}
impl<T> CowCell<T> {
/// Create a new cell.
pub fn new(data: T) -> CowCell<T> {
CowCell {
inner: ArcSwap::from(Arc::new(data)),
mutex: Mutex::new(()),
}
}
/// Get a read only clone from the cell.
pub fn get(&self) -> Arc<T> {
self.inner.load_full()
}
}
impl<T: Clone> CowCell<T> {
/// Acquire a write txn, blocking the current thread.
///
/// Note that this will clone the inner data.
pub fn lock(&self) -> TxnGuard<T> {
let _guard = self.mutex.lock().unwrap();
// Acquire a clone of data inside lock.
let data = (*self.get()).clone();
TxnGuard {
inner: &self.inner,
data,
_guard,
}
}
}
/// A RAII implementation of a write transaction of the [CowCell].
///
/// When this txn is dropped (falls out of scope or commited), the lock will be
/// unlocked, but updates to the content won't be visible unless the txn is committed.
#[must_use = "if unused the CowCell will immediately unlock"]
pub struct TxnGuard<'a, T: Clone> {
inner: &'a ArcSwap<T>,
data: T,
_guard: MutexGuard<'a, ()>,
}
impl<T: Clone> TxnGuard<'_, T> {
/// Commit updates to the cell and release the lock.
pub fn commit(self) {
let data = Arc::new(self.data);
self.inner.store(data);
}
}
impl<T: Clone> Deref for TxnGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T: Clone> DerefMut for TxnGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.data
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cow_cell_commit() {
let cell = CowCell::new(10);
assert_eq!(10, *cell.get());
let mut data = cell.lock();
assert_eq!(10, *data);
// It's okay to get read only clone from the cell during lock is held.
assert_eq!(10, *cell.get());
*data += 2;
assert_eq!(*data, 12);
// The modification is still not visible.
assert_eq!(10, *cell.get());
// Commit the txn.
data.commit();
// Once the guard is committed, the new data is visible.
assert_eq!(12, *cell.get());
}
#[test]
fn test_cow_cell_cancel() {
let cell = CowCell::new(10);
assert_eq!(10, *cell.get());
{
let mut data = cell.lock();
*data += 2;
}
// The update is not committed, should not be visible.
assert_eq!(10, *cell.get());
}
}

View File

@@ -0,0 +1,3 @@
pub mod descriptor_util;
pub mod schema_util;
pub mod write_batch_util;

View File

@@ -0,0 +1,78 @@
use datatypes::prelude::ConcreteDataType;
use store_api::storage::{
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId,
RegionDescriptor, RowKeyDescriptorBuilder,
};
use crate::test_util::schema_util::ColumnDef;
/// A RegionDescriptor builder for test.
pub struct RegionDescBuilder {
name: String,
last_column_id: ColumnId,
key_builder: RowKeyDescriptorBuilder,
default_cf_builder: ColumnFamilyDescriptorBuilder,
}
impl RegionDescBuilder {
pub fn new<T: Into<String>>(name: T) -> Self {
let key_builder = RowKeyDescriptorBuilder::new(
ColumnDescriptorBuilder::new(2, "timestamp", ConcreteDataType::uint64_datatype())
.is_nullable(false)
.build(),
);
Self {
name: name.into(),
last_column_id: 2,
key_builder,
default_cf_builder: ColumnFamilyDescriptorBuilder::new(),
}
}
// This will reset the row key builder, so should be called before `push_key_column()`
// and `enable_version_column()`, or just call after `new()`.
pub fn timestamp(mut self, column_def: ColumnDef) -> Self {
let builder = RowKeyDescriptorBuilder::new(self.new_column(column_def));
self.key_builder = builder;
self
}
pub fn enable_version_column(mut self, enable: bool) -> Self {
self.key_builder = self.key_builder.enable_version_column(enable);
self
}
pub fn push_key_column(mut self, column_def: ColumnDef) -> Self {
let column = self.new_column(column_def);
self.key_builder = self.key_builder.push_column(column);
self
}
pub fn push_value_column(mut self, column_def: ColumnDef) -> Self {
let column = self.new_column(column_def);
self.default_cf_builder = self.default_cf_builder.push_column(column);
self
}
pub fn build(self) -> RegionDescriptor {
RegionDescriptor {
name: self.name,
row_key: self.key_builder.build(),
default_cf: self.default_cf_builder.build(),
extra_cfs: Vec::new(),
}
}
fn alloc_column_id(&mut self) -> ColumnId {
self.last_column_id += 1;
self.last_column_id
}
fn new_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor {
let datatype = column_def.1.data_type();
ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype)
.is_nullable(column_def.2)
.build()
}
}

View File

@@ -0,0 +1,23 @@
use std::sync::Arc;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
/// Column definition: (name, datatype, is_nullable)
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
pub fn new_schema(column_defs: &[ColumnDef]) -> Schema {
let column_schemas = column_defs
.iter()
.map(|column_def| {
let datatype = column_def.1.data_type();
ColumnSchema::new(column_def.0, datatype, column_def.2)
})
.collect();
Schema::new(column_schemas)
}
pub fn new_schema_ref(column_defs: &[ColumnDef]) -> SchemaRef {
Arc::new(new_schema(column_defs))
}

View File

@@ -0,0 +1,10 @@
use store_api::storage::WriteRequest;
use crate::test_util::schema_util::{self, ColumnDef};
use crate::write_batch::WriteBatch;
pub fn new_write_batch(column_defs: &[ColumnDef]) -> WriteBatch {
let schema = schema_util::new_schema_ref(column_defs);
WriteBatch::new(schema)
}

View File

@@ -0,0 +1,60 @@
use std::sync::Arc;
use crate::memtable::MemtableSet;
use crate::metadata::{RegionMetadata, RegionMetadataRef};
use crate::sync::CowCell;
/// Controls version of in memory state for a region.
pub struct VersionControl {
version: CowCell<Version>,
}
impl VersionControl {
/// Construct a new version control from `metadata`.
pub fn new(metadata: RegionMetadata, memtables: MemtableSet) -> VersionControl {
VersionControl {
version: CowCell::new(Version::new(metadata, memtables)),
}
}
/// Returns current version.
pub fn current(&self) -> VersionRef {
self.version.get()
}
/// Metadata of current version.
pub fn metadata(&self) -> RegionMetadataRef {
let version = self.current();
version.metadata.clone()
}
}
pub type VersionControlRef = Arc<VersionControl>;
pub type VersionRef = Arc<Version>;
// Get data from version, need to
// 1. acquire version first
// 2. acquire sequence later
//
// Reason: data may flush and some data with old sequence may be removed, so need
// to acquire version at first.
/// Version contains metadata and state of region.
pub struct Version {
/// Metadata of the region. Altering metadata isn't frequent, storing metadata
/// in Arc to allow sharing metadata and reuse metadata when creating a new
/// `Version`.
metadata: RegionMetadataRef,
pub memtables: MemtableSet,
// TODO(yingwen): Also need to store last sequence to this version when switching
// version, so we can know the newest data can read from this version.
}
impl Version {
pub fn new(metadata: RegionMetadata, memtables: MemtableSet) -> Version {
Version {
metadata: Arc::new(metadata),
memtables,
}
}
}

View File

@@ -0,0 +1,442 @@
use std::any::Any;
use std::collections::HashMap;
use std::slice;
use common_error::prelude::*;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use snafu::ensure;
use store_api::storage::{consts, PutOperation, WriteRequest};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Duplicate column {} in same request", name))]
DuplicateColumn { name: String, backtrace: Backtrace },
#[snafu(display("Missing column {} in request", name))]
MissingColumn { name: String, backtrace: Backtrace },
#[snafu(display(
"Type of column {} does not match type in schema, expect {:?}, given {:?}",
name,
expect,
given
))]
TypeMismatch {
name: String,
expect: ConcreteDataType,
given: ConcreteDataType,
backtrace: Backtrace,
},
#[snafu(display("Column {} is not null but input has null", name))]
HasNull { name: String, backtrace: Backtrace },
#[snafu(display("Unknown column {}", name))]
UnknownColumn { name: String, backtrace: Backtrace },
#[snafu(display(
"Length of column {} not equals to other columns, expect {}, given {}",
name,
expect,
given
))]
LenNotEquals {
name: String,
expect: usize,
given: usize,
backtrace: Backtrace,
},
#[snafu(display(
"Request is too large, max is {}, current is {}",
MAX_BATCH_SIZE,
num_rows
))]
RequestTooLarge {
num_rows: usize,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
/// Max number of updates of a write batch.
const MAX_BATCH_SIZE: usize = 1_000_000;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Implementation of [WriteRequest].
pub struct WriteBatch {
schema: SchemaRef,
mutations: Vec<Mutation>,
num_rows: usize,
}
impl WriteRequest for WriteBatch {
type Error = Error;
type PutOp = PutData;
fn new(schema: SchemaRef) -> Self {
Self {
schema,
mutations: Vec::new(),
num_rows: 0,
}
}
fn put(&mut self, data: PutData) -> Result<()> {
if data.is_empty() {
return Ok(());
}
self.validate_put(&data)?;
self.add_num_rows(data.num_rows())?;
self.mutations.push(Mutation::Put(data));
Ok(())
}
}
// WriteBatch pub methods.
impl WriteBatch {
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn iter(&self) -> slice::Iter<'_, Mutation> {
self.mutations.iter()
}
pub fn is_empty(&self) -> bool {
self.mutations.is_empty()
}
}
pub enum Mutation {
Put(PutData),
}
#[derive(Default)]
pub struct PutData {
columns: HashMap<String, VectorRef>,
}
impl PutOperation for PutData {
type Error = Error;
fn new() -> PutData {
PutData::default()
}
fn with_num_columns(num_columns: usize) -> PutData {
PutData {
columns: HashMap::with_capacity(num_columns),
}
}
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<()> {
self.add_column_by_name(name, vector)
}
fn add_version_column(&mut self, vector: VectorRef) -> Result<()> {
// TODO(yingwen): Maybe ensure that version column must be a uint64 vector.
self.add_column_by_name(consts::VERSION_COLUMN_NAME, vector)
}
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<()> {
self.add_column_by_name(name, vector)
}
}
// PutData pub methods.
impl PutData {
pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> {
self.columns.get(name)
}
/// Returns number of rows in data.
pub fn num_rows(&self) -> usize {
self.columns
.values()
.next()
.map(|col| col.len())
.unwrap_or(0)
}
/// Returns true if no rows in data.
///
/// `PutData` with empty column will also be considered as empty.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
}
}
impl WriteBatch {
fn validate_put(&self, data: &PutData) -> Result<()> {
for column_schema in self.schema.column_schemas() {
match data.column_by_name(&column_schema.name) {
Some(col) => {
ensure!(
col.data_type() == column_schema.data_type,
TypeMismatchSnafu {
name: &column_schema.name,
expect: column_schema.data_type.clone(),
given: col.data_type(),
}
);
ensure!(
column_schema.is_nullable || col.null_count() == 0,
HasNullSnafu {
name: &column_schema.name,
}
);
}
None => {
ensure!(
column_schema.is_nullable,
MissingColumnSnafu {
name: &column_schema.name,
}
);
}
}
}
// Check all columns in data also exists in schema.
for name in data.columns.keys() {
ensure!(
self.schema.column_schema_by_name(name).is_some(),
UnknownColumnSnafu { name }
);
}
Ok(())
}
fn add_num_rows(&mut self, len: usize) -> Result<()> {
let num_rows = self.num_rows + len;
ensure!(
num_rows <= MAX_BATCH_SIZE,
RequestTooLargeSnafu { num_rows }
);
self.num_rows = num_rows;
Ok(())
}
}
impl<'a> IntoIterator for &'a WriteBatch {
type Item = &'a Mutation;
type IntoIter = slice::Iter<'a, Mutation>;
fn into_iter(self) -> slice::Iter<'a, Mutation> {
self.iter()
}
}
impl PutData {
fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> {
ensure!(
!self.columns.contains_key(name),
DuplicateColumnSnafu { name }
);
if let Some(col) = self.columns.values().next() {
ensure!(
col.len() == vector.len(),
LenNotEqualsSnafu {
name,
expect: col.len(),
given: vector.len(),
}
);
}
self.columns.insert(name.to_string(), vector);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::iter;
use std::sync::Arc;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::{BooleanVector, Int32Vector, UInt64Vector};
use super::*;
use crate::test_util::write_batch_util;
#[test]
fn test_put_data_basic() {
let mut put_data = PutData::new();
assert!(put_data.is_empty());
let vector1 = Arc::new(Int32Vector::from_slice(&[1, 2, 3, 4, 5]));
let vector2 = Arc::new(UInt64Vector::from_slice(&[0, 2, 4, 6, 8]));
put_data.add_key_column("k1", vector1.clone()).unwrap();
put_data.add_version_column(vector2).unwrap();
put_data.add_value_column("v1", vector1).unwrap();
assert_eq!(5, put_data.num_rows());
assert!(!put_data.is_empty());
assert!(put_data.column_by_name("no such column").is_none());
assert!(put_data.column_by_name("k1").is_some());
assert!(put_data.column_by_name("v1").is_some());
assert!(put_data
.column_by_name(consts::VERSION_COLUMN_NAME)
.is_some());
}
#[test]
fn test_put_data_empty_vector() {
let mut put_data = PutData::with_num_columns(1);
assert!(put_data.is_empty());
let vector1 = Arc::new(Int32Vector::from_slice(&[]));
put_data.add_key_column("k1", vector1).unwrap();
assert_eq!(0, put_data.num_rows());
assert!(put_data.is_empty());
}
fn new_test_batch() -> WriteBatch {
write_batch_util::new_write_batch(&[
("k1", LogicalTypeId::UInt64, false),
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
("v1", LogicalTypeId::Boolean, true),
])
}
#[test]
fn test_write_batch_put() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
let mut batch = new_test_batch();
assert!(batch.is_empty());
batch.put(put_data).unwrap();
assert!(!batch.is_empty());
let mut iter = batch.iter();
let Mutation::Put(put_data) = iter.next().unwrap();
assert_eq!(3, put_data.num_rows());
}
fn check_err(err: Error, msg: &str) {
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
assert!(err.to_string().contains(msg));
}
#[test]
fn test_write_batch_too_large() {
let boolv = Arc::new(BooleanVector::from_iter(
iter::repeat(Some(true)).take(MAX_BATCH_SIZE + 1),
));
let mut put_data = PutData::new();
put_data.add_key_column("k1", boolv).unwrap();
let mut batch = write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)]);
let err = batch.put(put_data).err().unwrap();
check_err(err, "Request is too large");
}
#[test]
fn test_put_data_duplicate() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
let err = put_data.add_key_column("k1", intv).err().unwrap();
check_err(err, "Duplicate column k1");
}
#[test]
fn test_put_data_different_len() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![true, false]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv).unwrap();
let err = put_data.add_value_column("v1", boolv).err().unwrap();
check_err(err, "Length of column v1 not equals");
}
#[test]
fn test_put_type_mismatch() {
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", boolv).unwrap();
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
check_err(err, "Type of column k1 does not match");
}
#[test]
fn test_put_type_has_null() {
let intv = Arc::new(UInt64Vector::from_iter(&[Some(1), None, Some(3)]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv).unwrap();
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
check_err(err, "Column k1 is not null");
}
#[test]
fn test_put_missing_column() {
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let mut put_data = PutData::new();
put_data.add_key_column("v1", boolv).unwrap();
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
check_err(err, "Missing column k1");
}
#[test]
fn test_put_unknown_column() {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![true, false, true]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv.clone()).unwrap();
put_data.add_value_column("v2", boolv).unwrap();
let mut batch = new_test_batch();
let err = batch.put(put_data).err().unwrap();
check_err(err, "Unknown column v2");
}
}

View File

@@ -13,4 +13,4 @@ futures = "0.3"
[dev-dependencies]
async-stream = "0.3"
tokio = { version = "1.18", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -1,22 +1,27 @@
//! Storage APIs.
mod column_family;
pub mod consts;
mod descriptors;
mod engine;
mod metadata;
mod region;
mod requests;
mod responses;
mod snapshot;
mod types;
pub use datatypes::data_type::ConcreteDataType;
pub use datatypes::schema::SchemaRef;
pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
pub use self::column_family::ColumnFamily;
pub use self::descriptors::{
ColumnDescriptor, ColumnFamilyDescriptor, KeyDescriptor, RegionDescriptor,
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor,
ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RowKeyDescriptor,
RowKeyDescriptorBuilder,
};
pub use self::engine::{EngineContext, StorageEngine};
pub use self::metadata::RegionMeta;
pub use self::region::{Region, WriteContext};
pub use self::requests::{GetRequest, ScanRequest, WriteRequest};
pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest};
pub use self::responses::{GetResponse, ScanResponse, WriteResponse};
pub use self::snapshot::{ReadContext, Snapshot};
pub use self::types::{SequenceNumber, ValueType};

View File

@@ -1,4 +0,0 @@
/// A group of value columns.
pub trait ColumnFamily: Send + Sync + Clone {
fn name(&self) -> &str;
}

View File

@@ -0,0 +1,27 @@
//! Constants.
use crate::storage::descriptors::{ColumnFamilyId, ColumnId};
// Ids reserved for internal column families:
/// Column family Id for row key columns.
///
/// This is virtual column family, actually row key columns are not
/// stored in any column family.
pub const KEY_CF_ID: ColumnFamilyId = 0;
/// Id for default column family.
pub const DEFAULT_CF_ID: ColumnFamilyId = 1;
// Ids reserved for internal columns:
// TODO(yingwen): Reserve one bit for internal columns.
/// Column id for version column.
pub const VERSION_COLUMN_ID: ColumnId = 1;
// Names reserved for internal columns:
/// Name of version column.
pub const VERSION_COLUMN_NAME: &str = "__version";
// Names for default column family.
pub const DEFAULT_CF_NAME: &str = "default";

View File

@@ -1,17 +1,38 @@
use crate::storage::ConcreteDataType;
use datatypes::value::Value;
/// A [ColumnDescriptor] contains information about a column.
#[derive(Debug)]
use crate::storage::{consts, ColumnSchema, ConcreteDataType};
/// Id of column, unique in each region.
pub type ColumnId = u32;
/// Id of column family, unique in each region.
pub type ColumnFamilyId = u32;
// TODO(yingwen): Validate default value has same type with column, and name is a valid column name.
/// A [ColumnDescriptor] contains information to create a column.
#[derive(Debug, Clone)]
pub struct ColumnDescriptor {
pub id: ColumnId,
pub name: String,
pub data_type: ConcreteDataType,
/// Is column nullable, default is true.
pub is_nullable: bool,
/// Default value of column, default is None, which means no default value
/// for this column, and user must provide value for a not-null column.
pub default_value: Option<Value>,
pub comment: String,
}
/// A [KeyDescriptor] contains information about a row key.
#[derive(Debug)]
pub struct KeyDescriptor {
impl From<&ColumnDescriptor> for ColumnSchema {
fn from(desc: &ColumnDescriptor) -> ColumnSchema {
ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable)
}
}
/// A [RowKeyDescriptor] contains information about row key.
#[derive(Debug, Clone)]
pub struct RowKeyDescriptor {
pub columns: Vec<ColumnDescriptor>,
/// Timestamp key column.
pub timestamp: ColumnDescriptor,
/// Enable version column in row key if this field is true.
///
@@ -19,21 +40,250 @@ pub struct KeyDescriptor {
pub enable_version_column: bool,
}
/// A [ColumnFamilyDescriptor] contains information about a column family.
#[derive(Debug)]
/// A [ColumnFamilyDescriptor] contains information to create a column family.
#[derive(Debug, Clone)]
pub struct ColumnFamilyDescriptor {
pub cf_id: ColumnFamilyId,
pub name: String,
/// Descriptors of columns in this column family.
pub columns: Vec<ColumnDescriptor>,
}
/// A [RegionDescriptor] contains information about a region.
#[derive(Debug)]
/// A [RegionDescriptor] contains information to create a region.
#[derive(Debug, Clone)]
pub struct RegionDescriptor {
/// Region name.
pub name: String,
/// Row key descriptor of this region.
pub key: KeyDescriptor,
pub row_key: RowKeyDescriptor,
/// Default column family.
pub default_cf: ColumnFamilyDescriptor,
/// Extra column families defined by user.
pub extra_cfs: Vec<ColumnFamilyDescriptor>,
}
pub struct ColumnDescriptorBuilder {
id: ColumnId,
name: String,
data_type: ConcreteDataType,
is_nullable: bool,
default_value: Option<Value>,
comment: String,
}
impl ColumnDescriptorBuilder {
pub fn new<T: Into<String>>(id: ColumnId, name: T, data_type: ConcreteDataType) -> Self {
Self {
id,
name: name.into(),
data_type,
is_nullable: true,
default_value: None,
comment: "".to_string(),
}
}
pub fn is_nullable(mut self, is_nullable: bool) -> Self {
self.is_nullable = is_nullable;
self
}
pub fn default_value(mut self, value: Option<Value>) -> Self {
self.default_value = value;
self
}
pub fn comment<T: Into<String>>(mut self, comment: T) -> Self {
self.comment = comment.into();
self
}
pub fn build(self) -> ColumnDescriptor {
ColumnDescriptor {
id: self.id,
name: self.name,
data_type: self.data_type,
is_nullable: self.is_nullable,
default_value: self.default_value,
comment: self.comment,
}
}
}
pub struct RowKeyDescriptorBuilder {
columns: Vec<ColumnDescriptor>,
timestamp: ColumnDescriptor,
enable_version_column: bool,
}
impl RowKeyDescriptorBuilder {
pub fn new(timestamp: ColumnDescriptor) -> Self {
Self {
columns: Vec::new(),
timestamp,
enable_version_column: true,
}
}
pub fn columns_capacity(mut self, capacity: usize) -> Self {
self.columns.reserve(capacity);
self
}
pub fn push_column(mut self, column: ColumnDescriptor) -> Self {
self.columns.push(column);
self
}
pub fn enable_version_column(mut self, enable: bool) -> Self {
self.enable_version_column = enable;
self
}
pub fn build(self) -> RowKeyDescriptor {
RowKeyDescriptor {
columns: self.columns,
timestamp: self.timestamp,
enable_version_column: self.enable_version_column,
}
}
}
pub struct ColumnFamilyDescriptorBuilder {
cf_id: ColumnFamilyId,
name: String,
columns: Vec<ColumnDescriptor>,
}
impl ColumnFamilyDescriptorBuilder {
pub fn new() -> Self {
Self {
cf_id: consts::DEFAULT_CF_ID,
name: consts::DEFAULT_CF_NAME.to_string(),
columns: Vec::new(),
}
}
pub fn cf_id(mut self, cf_id: ColumnFamilyId) -> Self {
self.cf_id = cf_id;
self
}
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
self.name = name.into();
self
}
pub fn columns_capacity(mut self, capacity: usize) -> Self {
self.columns.reserve(capacity);
self
}
pub fn push_column(mut self, column: ColumnDescriptor) -> Self {
self.columns.push(column);
self
}
pub fn build(self) -> ColumnFamilyDescriptor {
ColumnFamilyDescriptor {
cf_id: self.cf_id,
name: self.name,
columns: self.columns,
}
}
}
impl Default for ColumnFamilyDescriptorBuilder {
fn default() -> ColumnFamilyDescriptorBuilder {
ColumnFamilyDescriptorBuilder::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn new_column_desc_builder() -> ColumnDescriptorBuilder {
ColumnDescriptorBuilder::new(3, "test", ConcreteDataType::int32_datatype())
}
#[test]
fn test_column_descriptor_builder() {
let desc = new_column_desc_builder().build();
assert_eq!(3, desc.id);
assert_eq!("test", desc.name);
assert_eq!(ConcreteDataType::int32_datatype(), desc.data_type);
assert!(desc.is_nullable);
assert!(desc.default_value.is_none());
assert!(desc.comment.is_empty());
let desc = new_column_desc_builder().is_nullable(false).build();
assert!(!desc.is_nullable);
let desc = new_column_desc_builder()
.default_value(Some(Value::Null))
.build();
assert_eq!(Value::Null, desc.default_value.unwrap());
let desc = new_column_desc_builder()
.default_value(Some(Value::Int32(123)))
.build();
assert_eq!(Value::Int32(123), desc.default_value.unwrap());
let desc = new_column_desc_builder().comment("A test column").build();
assert_eq!("A test column", desc.comment);
}
fn new_timestamp_desc() -> ColumnDescriptor {
ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype()).build()
}
#[test]
fn test_row_key_descriptor_builder() {
let timestamp = new_timestamp_desc();
let desc = RowKeyDescriptorBuilder::new(timestamp.clone()).build();
assert!(desc.columns.is_empty());
assert!(desc.enable_version_column);
let desc = RowKeyDescriptorBuilder::new(timestamp.clone())
.columns_capacity(1)
.push_column(
ColumnDescriptorBuilder::new(6, "c1", ConcreteDataType::int32_datatype()).build(),
)
.push_column(
ColumnDescriptorBuilder::new(7, "c2", ConcreteDataType::int32_datatype()).build(),
)
.build();
assert_eq!(2, desc.columns.len());
assert!(desc.enable_version_column);
let desc = RowKeyDescriptorBuilder::new(timestamp)
.enable_version_column(false)
.build();
assert!(desc.columns.is_empty());
assert!(!desc.enable_version_column);
}
#[test]
fn test_cf_descriptor_builder() {
let desc = ColumnFamilyDescriptorBuilder::default().build();
assert_eq!(consts::DEFAULT_CF_ID, desc.cf_id);
assert_eq!(consts::DEFAULT_CF_NAME, desc.name);
assert!(desc.columns.is_empty());
let desc = ColumnFamilyDescriptorBuilder::new()
.cf_id(32)
.name("cf1")
.build();
assert_eq!(32, desc.cf_id);
assert_eq!("cf1", desc.name);
let desc = ColumnFamilyDescriptorBuilder::new()
.push_column(
ColumnDescriptorBuilder::new(6, "c1", ConcreteDataType::int32_datatype()).build(),
)
.build();
assert_eq!(1, desc.columns.len());
}
}

View File

@@ -4,33 +4,52 @@
//! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds
//! chunks of rows, support operations like PUT/DELETE/SCAN.
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use crate::storage::descriptors::RegionDescriptor;
use crate::storage::region::Region;
/// Storage engine provides primitive operations to store and access data.
#[async_trait]
pub trait StorageEngine: Send + Sync + Clone {
type Error: ErrorExt + Send + Sync;
type Region: Region;
/// Open an existing region.
fn open_region(&self, ctx: &EngineContext, name: &str) -> Result<Self::Region, Self::Error>;
/// Opens an existing region.
async fn open_region(
&self,
ctx: &EngineContext,
name: &str,
) -> Result<Self::Region, Self::Error>;
/// Close given region.
fn close_region(&self, ctx: &EngineContext, region: Self::Region) -> Result<(), Self::Error>;
/// Closes given region.
async fn close_region(
&self,
ctx: &EngineContext,
region: Self::Region,
) -> Result<(), Self::Error>;
/// Create and return a new region.
fn create_region(
/// Creates and returns the created region.
///
/// Returns exsiting region if region with same name already exists. The region will
/// be opened before returning.
async fn create_region(
&self,
ctx: &EngineContext,
descriptor: RegionDescriptor,
) -> Result<Self::Region, Self::Error>;
/// Drop given region.
fn drop_region(&self, ctx: &EngineContext, region: Self::Region) -> Result<(), Self::Error>;
/// Drops given region.
///
/// The region will be closed before dropping.
async fn drop_region(
&self,
ctx: &EngineContext,
region: Self::Region,
) -> Result<(), Self::Error>;
/// Return the opened region with given name.
/// Returns the opened region with given name.
fn get_region(
&self,
ctx: &EngineContext,
@@ -39,5 +58,5 @@ pub trait StorageEngine: Send + Sync + Clone {
}
/// Storage engine context.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct EngineContext {}

View File

@@ -0,0 +1,7 @@
use crate::storage::SchemaRef;
/// Metadata of a region.
pub trait RegionMeta: Send + Sync {
/// Returns the schema of the region.
fn schema(&self) -> &SchemaRef;
}

View File

@@ -1,5 +1,5 @@
//! Region holds chunks of rows stored in the storage engine, but does not require that
//! rows must have continuous primary key range, which is implementation sepecific.
//! rows must have continuous primary key range, which is implementation specific.
//!
//! Regions support operations like PUT/DELETE/SCAN that most key-value stores provide.
//! However, unlike key-value store, data stored in region has data model like:
@@ -9,37 +9,39 @@
//! ```
//!
//! The data model require each row
//! - has 0 ~ m key column
//! - **MUST** has a timestamp column
//! - has a version column
//! - has 0 ~ n value column
//! - has 0 ~ m key column, parts of row key columns;
//! - **MUST** has a timestamp column, part of row key columns;
//! - has a version column, part of row key columns;
//! - has 0 ~ n value column.
//!
//! Each row is identify by (value of key columns, timestamp, version), which forms
//! Each row is identified by (value of key columns, timestamp, version), which forms
//! a row key. Note that the implementation may allow multiple rows have same row
//! key (like ClickHouse), which is useful is analytic scenario.
//! key (like ClickHouse), which is useful in analytic scenario.
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use crate::storage::column_family::ColumnFamily;
use crate::storage::metadata::RegionMeta;
use crate::storage::requests::WriteRequest;
use crate::storage::responses::WriteResponse;
use crate::storage::snapshot::{ReadContext, Snapshot};
use crate::storage::SchemaRef;
/// Chunks of rows in storage engine.
#[async_trait]
pub trait Region: Send + Sync + Clone {
type Error: ErrorExt + Send + Sync;
type Meta: RegionMeta;
type WriteRequest: WriteRequest;
type ColumnFamily: ColumnFamily;
type Snapshot: Snapshot;
fn schema(&self) -> &SchemaRef;
/// Returns name of the region.
fn name(&self) -> &str;
/// List all column families.
fn list_cf(&self) -> Result<Vec<Self::ColumnFamily>, Self::Error>;
/// Returns the in memory metadata of this region.
fn in_memory_metadata(&self) -> Self::Meta;
/// Write updates to region.
fn write(
async fn write(
&self,
ctx: &WriteContext,
request: Self::WriteRequest,

View File

@@ -1,8 +1,30 @@
use crate::storage::column_family::ColumnFamily;
use common_error::ext::ErrorExt;
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
/// Write request holds a collection of updates to apply to a region.
pub trait WriteRequest: Send {
type ColumnFamily: ColumnFamily;
type Error: ErrorExt + Send + Sync;
type PutOp: PutOperation;
fn new(schema: SchemaRef) -> Self;
fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>;
}
/// Put multiple rows.
pub trait PutOperation {
type Error: ErrorExt + Send + Sync;
fn new() -> Self;
fn with_num_columns(num_columns: usize) -> Self;
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>;
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
}
#[derive(Debug)]

View File

@@ -1,23 +1,25 @@
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use datatypes::schema::SchemaRef;
use crate::storage::column_family::ColumnFamily;
use crate::storage::requests::{GetRequest, ScanRequest};
use crate::storage::responses::{GetResponse, ScanResponse};
/// A consistent read-only view of region.
#[async_trait]
pub trait Snapshot: Send + Sync {
type Error: ErrorExt + Send + Sync;
type ColumnFamily: ColumnFamily;
fn schema(&self) -> &SchemaRef;
fn scan(&self, ctx: &ReadContext, request: ScanRequest) -> Result<ScanResponse, Self::Error>;
async fn scan(
&self,
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse, Self::Error>;
fn get(&self, ctx: &ReadContext, request: GetRequest) -> Result<GetResponse, Self::Error>;
/// List all column families.
fn list_cf(&self) -> Result<Vec<Self::ColumnFamily>, Self::Error>;
async fn get(&self, ctx: &ReadContext, request: GetRequest)
-> Result<GetResponse, Self::Error>;
}
/// Context for read.

View File

@@ -0,0 +1,12 @@
//! Common types.
/// Represents a sequence number of data in storage. The offset of logstore can be used
/// as a sequence number.
pub type SequenceNumber = u64;
/// Operation type of the value to write to storage.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum ValueType {
/// Put operation.
Put,
}

View File

@@ -3,11 +3,6 @@ name = "table"
version = "0.1.0"
edition = "2021"
[dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -6,7 +6,6 @@ use std::fmt::Debug;
use std::mem;
use std::sync::{Arc, Mutex};
use arrow::error::{ArrowError, Result as ArrowResult};
use common_query::logical_plan::Expr;
use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
@@ -25,8 +24,9 @@ use datafusion::physical_plan::{
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
};
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::error::{ArrowError, Result as ArrowResult};
use datatypes::schema::SchemaRef as TableSchemaRef;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::schema::SchemaRef;
use futures::Stream;
use snafu::prelude::*;
@@ -109,6 +109,10 @@ impl DfTableProviderAdapter {
pub fn new(table: TableRef) -> Self {
Self { table }
}
pub fn table(&self) -> TableRef {
self.table.clone()
}
}
#[async_trait::async_trait]
@@ -160,16 +164,18 @@ impl TableProvider for DfTableProviderAdapter {
/// Datafusion TableProvider -> greptime Table
pub struct TableAdapter {
schema: TableSchemaRef,
table_provider: Arc<dyn TableProvider>,
runtime: Arc<RuntimeEnv>,
}
impl TableAdapter {
pub fn new(table_provider: Arc<dyn TableProvider>, runtime: Arc<RuntimeEnv>) -> Self {
Self {
pub fn new(table_provider: Arc<dyn TableProvider>, runtime: Arc<RuntimeEnv>) -> Result<Self> {
Ok(Self {
schema: Arc::new(table_provider.schema().try_into().unwrap()),
table_provider,
runtime,
}
})
}
}
@@ -180,7 +186,7 @@ impl Table for TableAdapter {
}
fn schema(&self) -> TableSchemaRef {
Arc::new(self.table_provider.schema().into())
self.schema.clone()
}
fn table_type(&self) -> TableType {
@@ -211,7 +217,10 @@ impl Table for TableAdapter {
.await
.context(error::DatafusionSnafu)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream)))
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
df_stream,
)))
}
fn supports_filter_pushdown(&self, filter: &Expr) -> Result<TableProviderFilterPushDown> {
@@ -268,18 +277,19 @@ impl Stream for DfRecordBatchStreamAdapter {
/// Datafusion SendableRecordBatchStream to greptime RecordBatchStream
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
}
impl RecordBatchStreamAdapter {
pub fn new(stream: DfSendableRecordBatchStream) -> Self {
Self { stream }
pub fn new(schema: SchemaRef, stream: DfSendableRecordBatchStream) -> Self {
Self { schema, stream }
}
}
impl RecordBatchStream for RecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(self.stream.schema()))
self.schema.clone()
}
}

View File

@@ -2,13 +2,12 @@ use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::arrow::array::UInt32Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use futures::task::{Context, Poll};
use futures::Stream;
@@ -16,20 +15,20 @@ use crate::error::Result;
use crate::table::{Expr, Table};
/// numbers table for test
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone)]
pub struct NumbersTable {
schema: SchemaRef,
}
impl Default for NumbersTable {
fn default() -> Self {
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
let column_schemas = vec![ColumnSchema::new(
"number",
DataType::UInt32,
ConcreteDataType::uint32_datatype(),
false,
)]));
)];
Self {
schema: Arc::new(Schema::new(arrow_schema)),
schema: Arc::new(Schema::new(column_schemas)),
}
}
}