From 4171173b76c5071ac86503f71a041ba14b9eb5f5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 9 Jun 2022 16:50:02 +0800 Subject: [PATCH] feat: Support creating in memory region and writing to memtable (#40) * chore(store-api): Fix typo in region comments * feat(storage): Init storage crate * feat(store-api): Make some method async * feat(storage): Blank StorageEngine implementation * feat(storage): StorageEngine returns owned SchemaRef * feat: pub use arrow in datatypes * feat(store-api): Implement RegionMetadata * feat(storage): Impl create region in memory. * chore(object-store): Format cargo toml * chore(storage): Log on region created * feat: Impl CowCell * feat: Store id to cf meta mapping * refactor: Refactor version and rename it to VersionControl * feat: Impl write batch for put, refactor column family * feat(storage): Skeleton of writing to memtable * refactor(storage): MemTable returns MemTableSchema * feat: Add ColumnSchema and conversion between schema and arrow's schema * feat: Validate put data * feat: Valid schema of write batch * feat: insert memtable WIP * feat: Impl Inserter for memtable * feat(datatypes): Implement Eq/Ord for Value feat: Implement Ord/Eq for Bytes/StringBytes and Deref for Bytes test: Test Value::from() * feat: Define BTreeMemTable * Fix: Rename get/get_unchecked to try_get/get and fix get not consider null. * feat: Impl BTreeMemTable::write() * refactor: Remove useless ColumnFamilyHandle now * chore: Clean comment * feat(common): Add from `String/&str/Vec/&[u8]` for Value * test(storage): Add tests for WriteBatch * chore: Fix clippy * feat: Add builder for RowKey/ColumnFamilyDescriptor * test: Add test for metadata * chore: Fix clippy * test: Add test for region and engine * chore: Fix clippy * chore: Address CR comment --- Cargo.lock | 31 +- Cargo.toml | 1 + src/common/error/src/status_code.rs | 2 + src/common/function/src/scalars/math/pow.rs | 2 +- src/common/function/src/scalars/numpy/clip.rs | 18 +- src/common/function/src/scalars/udf.rs | 4 +- src/common/recordbatch/Cargo.toml | 7 +- src/common/recordbatch/src/error.rs | 2 +- src/common/recordbatch/src/recordbatch.rs | 2 +- src/common/recordbatch/src/util.rs | 4 +- src/datatypes/Cargo.toml | 1 + src/datatypes/src/data_type.rs | 21 +- src/datatypes/src/error.rs | 9 + src/datatypes/src/lib.rs | 2 + src/datatypes/src/scalar.rs | 0 src/datatypes/src/schema.rs | 150 +++++- src/datatypes/src/type_id.rs | 30 +- src/datatypes/src/value.rs | 104 +++- src/datatypes/src/vectors.rs | 38 +- src/datatypes/src/vectors/binary.rs | 22 +- src/datatypes/src/vectors/boolean.rs | 17 +- src/datatypes/src/vectors/constant.rs | 10 +- src/datatypes/src/vectors/null.rs | 11 +- src/datatypes/src/vectors/primitive.rs | 16 +- src/datatypes/src/vectors/string.rs | 27 +- src/object-store/Cargo.toml | 2 +- src/query/src/catalog/schema.rs | 8 +- src/query/src/datafusion.rs | 7 +- src/query/src/datafusion/catalog_adapter.rs | 41 +- src/query/src/datafusion/error.rs | 16 +- src/query/src/datafusion/plan_adapter.rs | 5 +- src/storage/Cargo.toml | 16 + src/storage/src/engine.rs | 134 +++++ src/storage/src/error.rs | 75 +++ src/storage/src/lib.rs | 17 + src/storage/src/memtable.rs | 99 ++++ src/storage/src/memtable/btree.rs | 136 ++++++ src/storage/src/memtable/inserter.rs | 102 ++++ src/storage/src/memtable/schema.rs | 31 ++ src/storage/src/metadata.rs | 460 ++++++++++++++++++ src/storage/src/region.rs | 122 +++++ src/storage/src/region_writer.rs | 43 ++ src/storage/src/snapshot.rs | 26 + src/storage/src/sync.rs | 125 +++++ src/storage/src/test_util.rs | 3 + src/storage/src/test_util/descriptor_util.rs | 78 +++ src/storage/src/test_util/schema_util.rs | 23 + src/storage/src/test_util/write_batch_util.rs | 10 + src/storage/src/version.rs | 60 +++ src/storage/src/write_batch.rs | 442 +++++++++++++++++ src/store-api/Cargo.toml | 2 +- src/store-api/src/storage.rs | 15 +- src/store-api/src/storage/column_family.rs | 4 - src/store-api/src/storage/consts.rs | 27 + src/store-api/src/storage/descriptors.rs | 272 ++++++++++- src/store-api/src/storage/engine.rs | 39 +- src/store-api/src/storage/metadata.rs | 7 + src/store-api/src/storage/region.rs | 30 +- src/store-api/src/storage/requests.rs | 26 +- src/store-api/src/storage/snapshot.rs | 16 +- src/store-api/src/storage/types.rs | 12 + src/table/Cargo.toml | 5 - src/table/src/table/adapter.rs | 30 +- src/table/src/table/numbers.rs | 17 +- 64 files changed, 2911 insertions(+), 203 deletions(-) create mode 100644 src/datatypes/src/scalar.rs create mode 100644 src/storage/Cargo.toml create mode 100644 src/storage/src/engine.rs create mode 100644 src/storage/src/error.rs create mode 100644 src/storage/src/lib.rs create mode 100644 src/storage/src/memtable.rs create mode 100644 src/storage/src/memtable/btree.rs create mode 100644 src/storage/src/memtable/inserter.rs create mode 100644 src/storage/src/memtable/schema.rs create mode 100644 src/storage/src/metadata.rs create mode 100644 src/storage/src/region.rs create mode 100644 src/storage/src/region_writer.rs create mode 100644 src/storage/src/snapshot.rs create mode 100644 src/storage/src/sync.rs create mode 100644 src/storage/src/test_util.rs create mode 100644 src/storage/src/test_util/descriptor_util.rs create mode 100644 src/storage/src/test_util/schema_util.rs create mode 100644 src/storage/src/test_util/write_batch_util.rs create mode 100644 src/storage/src/version.rs create mode 100644 src/storage/src/write_batch.rs delete mode 100644 src/store-api/src/storage/column_family.rs create mode 100644 src/store-api/src/storage/consts.rs create mode 100644 src/store-api/src/storage/metadata.rs create mode 100644 src/store-api/src/storage/types.rs diff --git a/Cargo.lock b/Cargo.lock index d6f13ecf65..9df08ba630 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,12 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "array-init-cursor" version = "0.2.0" @@ -918,6 +924,7 @@ dependencies = [ "common-error", "datafusion-common", "enum_dispatch", + "ordered-float 3.0.0", "paste", "serde", "serde_json", @@ -2116,6 +2123,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96bcbab4bfea7a59c2c0fe47211a1ac4e3e96bea6eb446d704f310bc5c732ae2" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -2959,6 +2975,20 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "storage" +version = "0.1.0" +dependencies = [ + "arc-swap", + "async-trait", + "common-error", + "common-telemetry", + "datatypes", + "snafu", + "store-api", + "tokio", +] + [[package]] name = "store-api" version = "0.1.0" @@ -3044,7 +3074,6 @@ checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" name = "table" version = "0.1.0" dependencies = [ - "arrow2", "async-trait", "chrono", "common-error", diff --git a/Cargo.toml b/Cargo.toml index 67408e9658..ba7660ec76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "src/object-store", "src/query", "src/sql", + "src/storage", "src/store-api", "src/table", ] diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 0f3286e69e..d5a5745dfa 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -12,6 +12,8 @@ pub enum StatusCode { Unexpected, /// Internal server error. Internal, + /// Invalid arguments. + InvalidArguments, // ====== End of common status code ================ // ====== Begin of SQL related status code ========= diff --git a/src/common/function/src/scalars/math/pow.rs b/src/common/function/src/scalars/math/pow.rs index 05cf157c00..7e85dffd15 100644 --- a/src/common/function/src/scalars/math/pow.rs +++ b/src/common/function/src/scalars/math/pow.rs @@ -99,7 +99,7 @@ mod tests { for i in 0..3 { let p: f64 = (values[i] as f64).pow(bases[i] as f64); - assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == p)); + assert!(matches!(vector.get(i), Value::Float64(v) if v == p)); } } } diff --git a/src/common/function/src/scalars/numpy/clip.rs b/src/common/function/src/scalars/numpy/clip.rs index c1218e1fc8..6cd83b6567 100644 --- a/src/common/function/src/scalars/numpy/clip.rs +++ b/src/common/function/src/scalars/numpy/clip.rs @@ -198,11 +198,11 @@ mod tests { // clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6] for i in 0..10 { if i <= 3 { - assert!(matches!(vector.get_unchecked(i), Value::Int64(v) if v == 3)); + assert!(matches!(vector.get(i), Value::Int64(v) if v == 3)); } else if i <= 6 { - assert!(matches!(vector.get_unchecked(i), Value::Int64(v) if v == (i as i64))); + assert!(matches!(vector.get(i), Value::Int64(v) if v == (i as i64))); } else { - assert!(matches!(vector.get_unchecked(i), Value::Int64(v) if v == 6)); + assert!(matches!(vector.get(i), Value::Int64(v) if v == 6)); } } @@ -225,11 +225,11 @@ mod tests { // clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6] for i in 0..10 { if i <= 3 { - assert!(matches!(vector.get_unchecked(i), Value::UInt64(v) if v == 3)); + assert!(matches!(vector.get(i), Value::UInt64(v) if v == 3)); } else if i <= 6 { - assert!(matches!(vector.get_unchecked(i), Value::UInt64(v) if v == (i as u64))); + assert!(matches!(vector.get(i), Value::UInt64(v) if v == (i as u64))); } else { - assert!(matches!(vector.get_unchecked(i), Value::UInt64(v) if v == 6)); + assert!(matches!(vector.get(i), Value::UInt64(v) if v == 6)); } } @@ -252,11 +252,11 @@ mod tests { // clip([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 3, 6) = [3, 3, 3, 3, 4, 5, 6, 6, 6, 6] for i in 0..10 { if i <= 3 { - assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == 3.0)); + assert!(matches!(vector.get(i), Value::Float64(v) if v == 3.0)); } else if i <= 6 { - assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == (i as f64))); + assert!(matches!(vector.get(i), Value::Float64(v) if v == (i as f64))); } else { - assert!(matches!(vector.get_unchecked(i), Value::Float64(v) if v == 6.0)); + assert!(matches!(vector.get(i), Value::Float64(v) if v == 6.0)); } } } diff --git a/src/common/function/src/scalars/udf.rs b/src/common/function/src/scalars/udf.rs index a4d8801ed9..96e5149344 100644 --- a/src/common/function/src/scalars/udf.rs +++ b/src/common/function/src/scalars/udf.rs @@ -83,9 +83,7 @@ mod tests { assert_eq!(3, vector.len()); for i in 0..3 { - assert!( - matches!(vector.get_unchecked(i), Value::Boolean(b) if b == (i == 0 || i == 2)) - ); + assert!(matches!(vector.get(i), Value::Boolean(b) if b == (i == 0 || i == 2))); } // create a udf and test it again diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index d59ccf425b..e7ae297edf 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -3,15 +3,10 @@ name = "common-recordbatch" version = "0.1.0" edition = "2021" -[dependencies.arrow] -package = "arrow2" -version="0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] - [dependencies] datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"} -datatypes = {path ="../../datatypes" } +datatypes = { path = "../../datatypes" } futures = "0.3" paste = "1.0" serde = "1.0" diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 0d621a668b..494258fa65 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -1,4 +1,4 @@ -use arrow::error::ArrowError; +use datatypes::arrow::error::ArrowError; use snafu::{Backtrace, Snafu}; #[derive(Debug, Snafu)] diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 233c775eca..8fcf58fdef 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -49,7 +49,7 @@ mod tests { DataType::UInt32, false, )])); - let schema = Arc::new(Schema::new(arrow_schema.clone())); + let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); let numbers: Vec = (0..10).collect(); let df_batch = DfRecordBatch::try_new( diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs index f53309daa8..1df50f2b20 100644 --- a/src/common/recordbatch/src/util.rs +++ b/src/common/recordbatch/src/util.rs @@ -56,7 +56,7 @@ mod tests { DataType::UInt32, false, )])); - let schema = Arc::new(Schema::new(arrow_schema.clone())); + let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); let stream = MockRecordBatchStream { schema: schema.clone(), @@ -79,7 +79,7 @@ mod tests { }; let stream = MockRecordBatchStream { - schema: Arc::new(Schema::new(arrow_schema)), + schema: Arc::new(Schema::try_from(arrow_schema).unwrap()), batch: Some(batch.clone()), }; let batches = collect(Box::pin(stream)).await.unwrap(); diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index b10d34296b..7b6ebddd9b 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -13,6 +13,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2" } enum_dispatch = "0.3" +ordered-float = "3.0" paste = "1.0" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0" diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 54197fca8e..2f2277ca82 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use arrow::datatypes::DataType as ArrowDataType; use paste::paste; +use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; use crate::types::{ BinaryType, BooleanType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, @@ -81,7 +82,15 @@ impl ConcreteDataType { /// # Panics /// Panic if given arrow data type is not supported. pub fn from_arrow_type(dt: &ArrowDataType) -> Self { - match dt { + ConcreteDataType::try_from(dt).expect("Unimplemented type") + } +} + +impl TryFrom<&ArrowDataType> for ConcreteDataType { + type Error = Error; + + fn try_from(dt: &ArrowDataType) -> Result { + let concrete_type = match dt { ArrowDataType::Null => Self::null_datatype(), ArrowDataType::Boolean => Self::boolean_datatype(), ArrowDataType::UInt8 => Self::uint8_datatype(), @@ -97,9 +106,14 @@ impl ConcreteDataType { ArrowDataType::Binary | ArrowDataType::LargeBinary => Self::binary_datatype(), ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => Self::string_datatype(), _ => { - unimplemented!("arrow data_type: {:?}", dt) + return error::UnsupportedArrowTypeSnafu { + arrow_type: dt.clone(), + } + .fail() } - } + }; + + Ok(concrete_type) } } @@ -122,6 +136,7 @@ impl_new_concrete_type_functions!( Binary, String ); +/// Data type abstraction. #[enum_dispatch::enum_dispatch] pub trait DataType: std::fmt::Debug + Send + Sync { /// Name of this data type. diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 287126dfff..e6d7dc407a 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -11,16 +11,25 @@ pub enum Error { source: serde_json::Error, backtrace: Backtrace, }, + #[snafu(display("Failed to convert datafusion type: {}", from))] Conversion { from: String, backtrace: Backtrace }, + #[snafu(display("Bad array access, Index out of bounds: {}, size: {}", index, size))] BadArrayAccess { index: usize, size: usize, backtrace: Backtrace, }, + #[snafu(display("Unknown vector, {}", msg))] UnknownVector { msg: String, backtrace: Backtrace }, + + #[snafu(display("Unsupported arrow data type, type: {:?}", arrow_type))] + UnsupportedArrowType { + arrow_type: arrow::datatypes::DataType, + backtrace: Backtrace, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index d7bd7ed6ec..91c76247d2 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -13,3 +13,5 @@ pub mod type_id; pub mod types; pub mod value; pub mod vectors; + +pub use arrow; diff --git a/src/datatypes/src/scalar.rs b/src/datatypes/src/scalar.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index d0556a426a..9f11b340e1 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,25 +1,161 @@ +use std::collections::HashMap; use std::sync::Arc; -use arrow::datatypes::Schema as ArrowSchema; +use arrow::datatypes::{Field, Schema as ArrowSchema}; -#[derive(Debug, Clone, Eq, PartialEq)] +use crate::data_type::{ConcreteDataType, DataType}; +use crate::error::{Error, Result}; + +// TODO(yingwen): consider assign a version to schema so compare schema can be +// done by compare version. + +#[derive(Debug, Clone, PartialEq)] +pub struct ColumnSchema { + pub name: String, + pub data_type: ConcreteDataType, + pub is_nullable: bool, +} + +impl ColumnSchema { + pub fn new>( + name: T, + data_type: ConcreteDataType, + is_nullable: bool, + ) -> ColumnSchema { + ColumnSchema { + name: name.into(), + data_type, + is_nullable, + } + } +} + +#[derive(Debug, Clone, PartialEq)] pub struct Schema { + column_schemas: Vec, + name_to_index: HashMap, arrow_schema: Arc, } impl Schema { - pub fn new(arrow_schema: Arc) -> Self { - Self { arrow_schema } + pub fn new(column_schemas: Vec) -> Schema { + let mut fields = Vec::with_capacity(column_schemas.len()); + let mut name_to_index = HashMap::with_capacity(column_schemas.len()); + for (index, column_schema) in column_schemas.iter().enumerate() { + let field = Field::from(column_schema); + fields.push(field); + name_to_index.insert(column_schema.name.clone(), index); + } + let arrow_schema = Arc::new(ArrowSchema::from(fields)); + + Schema { + column_schemas, + name_to_index, + arrow_schema, + } } + pub fn arrow_schema(&self) -> &Arc { &self.arrow_schema } + + pub fn column_schemas(&self) -> &[ColumnSchema] { + &self.column_schemas + } + + pub fn column_schema_by_name(&self, name: &str) -> Option<&ColumnSchema> { + self.name_to_index + .get(name) + .map(|index| &self.column_schemas[*index]) + } } pub type SchemaRef = Arc; -impl From> for Schema { - fn from(s: Arc) -> Schema { - Schema::new(s) +impl TryFrom<&Field> for ColumnSchema { + type Error = Error; + + fn try_from(field: &Field) -> Result { + let data_type = ConcreteDataType::try_from(&field.data_type)?; + + Ok(ColumnSchema { + name: field.name.clone(), + data_type, + is_nullable: field.is_nullable, + }) + } +} + +impl From<&ColumnSchema> for Field { + fn from(column_schema: &ColumnSchema) -> Field { + Field::new( + column_schema.name.clone(), + column_schema.data_type.as_arrow_type(), + column_schema.is_nullable, + ) + } +} + +impl TryFrom> for Schema { + type Error = Error; + + fn try_from(arrow_schema: Arc) -> Result { + let mut column_schemas = Vec::with_capacity(arrow_schema.fields.len()); + let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len()); + for field in &arrow_schema.fields { + let column_schema = ColumnSchema::try_from(field)?; + name_to_index.insert(field.name.clone(), column_schemas.len()); + column_schemas.push(column_schema); + } + + Ok(Self { + column_schemas, + name_to_index, + arrow_schema, + }) + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::DataType as ArrowDataType; + + use super::*; + + #[test] + fn test_column_schema() { + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true); + let field = Field::from(&column_schema); + assert_eq!("test", field.name); + assert_eq!(ArrowDataType::Int32, field.data_type); + assert!(field.is_nullable); + + let new_column_schema = ColumnSchema::try_from(&field).unwrap(); + assert_eq!(column_schema, new_column_schema); + } + + #[test] + fn test_schema() { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), true), + ]; + let schema = Schema::new(column_schemas.clone()); + + for column_schema in &column_schemas { + let found = schema.column_schema_by_name(&column_schema.name).unwrap(); + assert_eq!(column_schema, found); + } + assert!(schema.column_schema_by_name("col3").is_none()); + + let fields: Vec<_> = column_schemas.iter().map(Field::from).collect(); + let arrow_schema = Arc::new(ArrowSchema::from(fields)); + + let new_schema = Schema::try_from(arrow_schema.clone()).unwrap(); + + assert_eq!(schema, new_schema); + assert_eq!(column_schemas, schema.column_schemas()); + assert_eq!(arrow_schema, *schema.arrow_schema()); + assert_eq!(arrow_schema, *new_schema.arrow_schema()); } } diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index dddc28ba40..629604c31f 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -1,5 +1,7 @@ +use crate::data_type::ConcreteDataType; + /// Unique identifier for logical data type. -#[derive(Debug, PartialEq)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum LogicalTypeId { Null, @@ -28,3 +30,29 @@ pub enum LogicalTypeId { /// seconds/milliseconds/microseconds/nanoseconds, determined by precision. DateTime, } + +impl LogicalTypeId { + /// # Panics + /// Panics if data type is not supported. + pub fn data_type(&self) -> ConcreteDataType { + match self { + LogicalTypeId::Null => ConcreteDataType::null_datatype(), + LogicalTypeId::Boolean => ConcreteDataType::boolean_datatype(), + LogicalTypeId::Int8 => ConcreteDataType::int8_datatype(), + LogicalTypeId::Int16 => ConcreteDataType::int16_datatype(), + LogicalTypeId::Int32 => ConcreteDataType::int32_datatype(), + LogicalTypeId::Int64 => ConcreteDataType::int64_datatype(), + LogicalTypeId::UInt8 => ConcreteDataType::uint8_datatype(), + LogicalTypeId::UInt16 => ConcreteDataType::uint16_datatype(), + LogicalTypeId::UInt32 => ConcreteDataType::uint32_datatype(), + LogicalTypeId::UInt64 => ConcreteDataType::uint64_datatype(), + LogicalTypeId::Float32 => ConcreteDataType::float32_datatype(), + LogicalTypeId::Float64 => ConcreteDataType::float64_datatype(), + LogicalTypeId::String => ConcreteDataType::string_datatype(), + LogicalTypeId::Binary => ConcreteDataType::binary_datatype(), + LogicalTypeId::Date | LogicalTypeId::DateTime => { + unimplemented!("Data type for {:?} is unimplemented", self) + } + } + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index f81c54af8f..dee4b25364 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -1,8 +1,16 @@ use common_base::bytes::{Bytes, StringBytes}; +use ordered_float::OrderedFloat; use serde::{Serialize, Serializer}; +pub type OrderedF32 = OrderedFloat; +pub type OrderedF64 = OrderedFloat; + /// Value holds a single arbitrary value of any [DataType](crate::data_type::DataType). -#[derive(Debug, PartialEq, Clone)] +/// +/// Although compare Value with different data type is allowed, it is recommended to only +/// compare Value with same data type. Comparing Value with different data type may not +/// behaves as what you expect. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum Value { Null, @@ -16,8 +24,8 @@ pub enum Value { Int16(i16), Int32(i32), Int64(i64), - Float32(f32), - Float64(f64), + Float32(OrderedF32), + Float64(OrderedF64), // String types: String(StringBytes), @@ -32,14 +40,14 @@ macro_rules! impl_from { ($Variant:ident, $Type:ident) => { impl From<$Type> for Value { fn from(value: $Type) -> Self { - Value::$Variant(value) + Value::$Variant(value.into()) } } impl From> for Value { fn from(value: Option<$Type>) -> Self { match value { - Some(v) => Value::$Variant(v), + Some(v) => Value::$Variant(v.into()), None => Value::Null, } } @@ -110,3 +118,89 @@ impl Serialize for Value { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_value_from_inner() { + assert_eq!(Value::Boolean(true), Value::from(true)); + assert_eq!(Value::Boolean(false), Value::from(false)); + + assert_eq!(Value::UInt8(u8::MIN), Value::from(u8::MIN)); + assert_eq!(Value::UInt8(u8::MAX), Value::from(u8::MAX)); + + assert_eq!(Value::UInt16(u16::MIN), Value::from(u16::MIN)); + assert_eq!(Value::UInt16(u16::MAX), Value::from(u16::MAX)); + + assert_eq!(Value::UInt32(u32::MIN), Value::from(u32::MIN)); + assert_eq!(Value::UInt32(u32::MAX), Value::from(u32::MAX)); + + assert_eq!(Value::UInt64(u64::MIN), Value::from(u64::MIN)); + assert_eq!(Value::UInt64(u64::MAX), Value::from(u64::MAX)); + + assert_eq!(Value::Int8(i8::MIN), Value::from(i8::MIN)); + assert_eq!(Value::Int8(i8::MAX), Value::from(i8::MAX)); + + assert_eq!(Value::Int16(i16::MIN), Value::from(i16::MIN)); + assert_eq!(Value::Int16(i16::MAX), Value::from(i16::MAX)); + + assert_eq!(Value::Int32(i32::MIN), Value::from(i32::MIN)); + assert_eq!(Value::Int32(i32::MAX), Value::from(i32::MAX)); + + assert_eq!(Value::Int64(i64::MIN), Value::from(i64::MIN)); + assert_eq!(Value::Int64(i64::MAX), Value::from(i64::MAX)); + + assert_eq!( + Value::Float32(OrderedFloat(f32::MIN)), + Value::from(f32::MIN) + ); + assert_eq!( + Value::Float32(OrderedFloat(f32::MAX)), + Value::from(f32::MAX) + ); + + assert_eq!( + Value::Float64(OrderedFloat(f64::MIN)), + Value::from(f64::MIN) + ); + assert_eq!( + Value::Float64(OrderedFloat(f64::MAX)), + Value::from(f64::MAX) + ); + + let string_bytes = StringBytes::from("hello"); + assert_eq!( + Value::String(string_bytes.clone()), + Value::from(string_bytes) + ); + + let bytes = Bytes::from(b"world".as_slice()); + assert_eq!(Value::Binary(bytes.clone()), Value::from(bytes)); + } + + #[test] + fn test_value_from_string() { + let hello = "hello".to_string(); + assert_eq!( + Value::String(StringBytes::from(hello.clone())), + Value::from(hello) + ); + + let world = "world"; + assert_eq!(Value::String(StringBytes::from(world)), Value::from(world)); + } + + #[test] + fn test_value_from_bytes() { + let hello = b"hello".to_vec(); + assert_eq!( + Value::Binary(Bytes::from(hello.clone())), + Value::from(hello) + ); + + let world: &[u8] = b"world"; + assert_eq!(Value::Binary(Bytes::from(world)), Value::from(world)); + } +} diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 2d7c82c741..89134df4e4 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -104,11 +104,15 @@ pub trait Vector: Send + Sync + Serializable { fn slice(&self, offset: usize, length: usize) -> VectorRef; - /// # Safety - /// Assumes that the `index` is smaller than size. - fn get_unchecked(&self, index: usize) -> Value; + /// Returns the clone of value at `index`. + /// + /// # Panics + /// Panic if `index` is out of bound. + fn get(&self, index: usize) -> Value; - fn get(&self, index: usize) -> Result { + /// Returns the clone of value at `index` or error if `index` + /// is out of bound. + fn try_get(&self, index: usize) -> Result { ensure!( index < self.len(), BadArrayAccessSnafu { @@ -116,7 +120,7 @@ pub trait Vector: Send + Sync + Serializable { size: self.len() } ); - Ok(self.get_unchecked(index)) + Ok(self.get(index)) } // Copies each element according offsets parameter. @@ -147,7 +151,29 @@ macro_rules! impl_try_from_arrow_array_for_vector { }; } -pub(crate) use impl_try_from_arrow_array_for_vector; +macro_rules! impl_validity_for_vector { + ($array: expr) => { + match $array.validity() { + Some(bitmap) => Validity::Slots(bitmap), + None => Validity::AllValid, + } + }; +} + +macro_rules! impl_get_for_vector { + ($array: expr, $index: ident) => { + if $array.is_valid($index) { + // Safety: The index have been checked by `is_valid()`. + unsafe { $array.value_unchecked($index).into() } + } else { + Value::Null + } + }; +} + +pub(crate) use { + impl_get_for_vector, impl_try_from_arrow_array_for_vector, impl_validity_for_vector, +}; #[cfg(test)] pub mod tests { diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 0f205fa3b9..074bd2140b 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -14,8 +14,7 @@ use crate::error::SerializeSnafu; use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::Value; -use crate::vectors::impl_try_from_arrow_array_for_vector; -use crate::vectors::{MutableVector, Validity, Vector, VectorRef}; +use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector of binary strings. #[derive(Debug)] @@ -59,10 +58,7 @@ impl Vector for BinaryVector { } fn validity(&self) -> Validity { - match self.array.validity() { - Some(bitmap) => Validity::Slots(bitmap), - None => Validity::AllValid, - } + vectors::impl_validity_for_vector!(self.array) } fn is_null(&self, row: usize) -> bool { @@ -73,8 +69,8 @@ impl Vector for BinaryVector { Arc::new(Self::from(self.array.slice(offset, length))) } - fn get_unchecked(&self, index: usize) -> Value { - self.array.value(index).into() + fn get(&self, index: usize) -> Value { + vectors::impl_get_for_vector!(self.array, index) } fn replicate(&self, offsets: &[usize]) -> VectorRef { @@ -159,7 +155,7 @@ impl Serializable for BinaryVector { } } -impl_try_from_arrow_array_for_vector!(LargeBinaryArray, BinaryVector); +vectors::impl_try_from_arrow_array_for_vector!(LargeBinaryArray, BinaryVector); #[cfg(test)] mod tests { @@ -186,10 +182,7 @@ mod tests { for i in 0..2 { assert!(!v.is_null(i)); - assert_eq!( - Value::Binary(Bytes::from(vec![1, 2, 3])), - v.get_unchecked(i) - ); + assert_eq!(Value::Binary(Bytes::from(vec![1, 2, 3])), v.get(i)); } let arrow_arr = v.to_arrow_array(); @@ -246,6 +239,9 @@ mod tests { assert_eq!(b"hello", vector.get_data(0).unwrap()); assert_eq!(None, vector.get_data(3)); + assert_eq!(Value::Binary(b"hello".as_slice().into()), vector.get(0)); + assert_eq!(Value::Null, vector.get(3)); + let mut iter = vector.iter_data(); assert_eq!(b"hello", iter.next().unwrap().unwrap()); assert_eq!(b"happy", iter.next().unwrap().unwrap()); diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index a98240e579..6df850c333 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -13,8 +13,7 @@ use crate::scalars::common::replicate_scalar_vector; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::value::Value; -use crate::vectors::impl_try_from_arrow_array_for_vector; -use crate::vectors::{MutableVector, Validity, Vector, VectorRef}; +use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector of boolean. #[derive(Debug)] @@ -74,10 +73,7 @@ impl Vector for BooleanVector { } fn validity(&self) -> Validity { - match self.array.validity() { - Some(bitmap) => Validity::Slots(bitmap), - None => Validity::AllValid, - } + vectors::impl_validity_for_vector!(self.array) } fn is_null(&self, row: usize) -> bool { @@ -88,8 +84,8 @@ impl Vector for BooleanVector { Arc::new(Self::from(self.array.slice(offset, length))) } - fn get_unchecked(&self, index: usize) -> Value { - self.array.value(index).into() + fn get(&self, index: usize) -> Value { + vectors::impl_get_for_vector!(self.array, index) } fn replicate(&self, offsets: &[usize]) -> VectorRef { @@ -171,7 +167,7 @@ impl Serializable for BooleanVector { } } -impl_try_from_arrow_array_for_vector!(BooleanArray, BooleanVector); +vectors::impl_try_from_arrow_array_for_vector!(BooleanArray, BooleanVector); #[cfg(test)] mod tests { @@ -193,7 +189,7 @@ mod tests { for (i, b) in bools.iter().enumerate() { assert!(!v.is_null(i)); - assert_eq!(Value::Boolean(*b), v.get_unchecked(i)); + assert_eq!(Value::Boolean(*b), v.get(i)); } let arrow_arr = v.to_arrow_array(); @@ -268,6 +264,7 @@ mod tests { for (i, v) in input.into_iter().enumerate() { assert_eq!(v, vector.get_data(i)); + assert_eq!(Value::from(v), vector.get(i)); } } diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs index b696fc1824..ec58befd38 100644 --- a/src/datatypes/src/vectors/constant.rs +++ b/src/datatypes/src/vectors/constant.rs @@ -81,8 +81,8 @@ impl Vector for ConstantVector { }) } - fn get_unchecked(&self, _index: usize) -> Value { - self.vector.get_unchecked(0) + fn get(&self, _index: usize) -> Value { + self.vector.get(0) } fn replicate(&self, offsets: &[usize]) -> VectorRef { @@ -100,7 +100,7 @@ impl fmt::Debug for ConstantVector { write!( f, "ConstantVector([{:?}; {}])", - self.get(0).unwrap_or(Value::Null), + self.try_get(0).unwrap_or(Value::Null), self.len() ) } @@ -108,7 +108,7 @@ impl fmt::Debug for ConstantVector { impl Serializable for ConstantVector { fn serialize_to_json(&self) -> Result> { - std::iter::repeat(self.get(0)?) + std::iter::repeat(self.try_get(0)?) .take(self.len()) .map(serde_json::to_value) .collect::>() @@ -136,7 +136,7 @@ mod tests { for i in 0..10 { assert!(!c.is_null(i)); - assert_eq!(Value::Int32(1), c.get_unchecked(i)); + assert_eq!(Value::Int32(1), c.get(i)); } let arrow_arr = c.to_arrow_array(); diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index 9ab28890f6..0f3deec579 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -62,10 +62,6 @@ impl Vector for NullVector { true } - fn get_unchecked(&self, _index: usize) -> Value { - Value::Null - } - fn only_null(&self) -> bool { true } @@ -74,6 +70,11 @@ impl Vector for NullVector { Arc::new(Self::new(length)) } + fn get(&self, _index: usize) -> Value { + // Skips bound check for null array. + Value::Null + } + fn replicate(&self, offsets: &[usize]) -> VectorRef { debug_assert!( offsets.len() == self.len(), @@ -127,7 +128,7 @@ mod tests { for i in 0..32 { assert!(v.is_null(i)); - assert_eq!(Value::Null, v.get_unchecked(i)); + assert_eq!(Value::Null, v.get(i)); } } diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 72cdad8fb1..165a3626d4 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -16,7 +16,7 @@ use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::types::{DataTypeBuilder, Primitive}; use crate::value::Value; -use crate::vectors::{MutableVector, Validity, Vector, VectorRef}; +use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// Vector for primitive data types. #[derive(Debug)] @@ -81,10 +81,7 @@ impl Vector for PrimitiveVector { } fn validity(&self) -> Validity { - match self.array.validity() { - Some(bitmap) => Validity::Slots(bitmap), - None => Validity::AllValid, - } + vectors::impl_validity_for_vector!(self.array) } fn is_null(&self, row: usize) -> bool { @@ -95,8 +92,8 @@ impl Vector for PrimitiveVector { Arc::new(Self::from(self.array.slice(offset, length))) } - fn get_unchecked(&self, index: usize) -> Value { - self.array.value(index).into() + fn get(&self, index: usize) -> Value { + vectors::impl_get_for_vector!(self.array, index) } fn replicate(&self, offsets: &[usize]) -> VectorRef { @@ -288,7 +285,7 @@ mod tests { for i in 0..4 { assert!(!v.is_null(i)); - assert_eq!(Value::Int32(i as i32 + 1), v.get_unchecked(i)); + assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); } let json_value = v.serialize_to_json().unwrap(); @@ -352,6 +349,7 @@ mod tests { for (i, v) in input.into_iter().enumerate() { assert_eq!(v, vector.get_data(i)); + assert_eq!(Value::from(v), vector.get(i)); } let res: Vec<_> = vector.iter_data().collect(); @@ -388,7 +386,7 @@ mod tests { assert_eq!(4, v.len()); for i in 0..4 { - assert_eq!(Value::Int32(i as i32 + 1), v.get_unchecked(i)); + assert_eq!(Value::Int32(i as i32 + 1), v.get(i)); } } } diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index be387d7481..34eb175127 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -10,12 +10,11 @@ use snafu::ResultExt; use crate::arrow_array::{MutableStringArray, StringArray}; use crate::data_type::ConcreteDataType; use crate::error::SerializeSnafu; -use crate::prelude::{MutableVector, ScalarVectorBuilder, Validity, Vector, VectorRef}; -use crate::scalars::{common, ScalarVector}; +use crate::scalars::{common, ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; use crate::types::StringType; use crate::value::Value; -use crate::vectors::impl_try_from_arrow_array_for_vector; +use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; /// String array wrapper #[derive(Debug, Clone)] @@ -91,10 +90,7 @@ impl Vector for StringVector { } fn validity(&self) -> Validity { - match self.array.validity() { - Some(bitmap) => Validity::Slots(bitmap), - None => Validity::AllValid, - } + vectors::impl_validity_for_vector!(self.array) } fn is_null(&self, row: usize) -> bool { @@ -105,8 +101,8 @@ impl Vector for StringVector { Arc::new(Self::from(self.array.slice(offset, length))) } - fn get_unchecked(&self, index: usize) -> Value { - self.array.value(index).into() + fn get(&self, index: usize) -> Value { + vectors::impl_get_for_vector!(self.array, index) } fn replicate(&self, offsets: &[usize]) -> VectorRef { @@ -191,7 +187,7 @@ impl Serializable for StringVector { } } -impl_try_from_arrow_array_for_vector!(StringArray, StringVector); +vectors::impl_try_from_arrow_array_for_vector!(StringArray, StringVector); #[cfg(test)] mod tests { @@ -211,8 +207,8 @@ mod tests { assert!(!v.only_null()); for (i, s) in strs.iter().enumerate() { - assert_eq!(Value::from(*s), v.get_unchecked(i)); - assert_eq!(Value::from(*s), v.get(i).unwrap()); + assert_eq!(Value::from(*s), v.get(i)); + assert_eq!(Value::from(*s), v.try_get(i).unwrap()); } let arrow_arr = v.to_arrow_array(); @@ -259,6 +255,13 @@ mod tests { assert_eq!(None, vector.get_data(1)); assert_eq!(Some("world"), vector.get_data(2)); + // Get out of bound + assert!(vector.try_get(3).is_err()); + + assert_eq!(Value::String("hello".into()), vector.get(0)); + assert_eq!(Value::Null, vector.get(1)); + assert_eq!(Value::String("world".into()), vector.get(2)); + let mut iter = vector.iter_data(); assert_eq!("hello", iter.next().unwrap().unwrap()); assert_eq!(None, iter.next().unwrap()); diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 0ccf67b137..5c651b9c3b 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -12,5 +12,5 @@ tokio = { version = "1.0", features = ["full"] } [dev-dependencies] anyhow = "1.0" -common-telemetry = { path = "../common/telemetry"} +common-telemetry = { path = "../common/telemetry" } tempdir = "0.3" diff --git a/src/query/src/catalog/schema.rs b/src/query/src/catalog/schema.rs index bbdfaad3b1..5b59939668 100644 --- a/src/query/src/catalog/schema.rs +++ b/src/query/src/catalog/schema.rs @@ -19,15 +19,11 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, adds a new table to this schema. /// If a table of the same name existed before, it returns "Table already exists" error. - fn register_table(&self, _name: String, _table: TableRef) -> Result> { - todo!(); - } + fn register_table(&self, name: String, table: TableRef) -> Result>; /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). - fn deregister_table(&self, _name: &str) -> Result> { - todo!(); - } + fn deregister_table(&self, name: &str) -> Result>; /// If supported by the implementation, checks the table exist in the schema provider or not. /// If no matched table in the schema provider, return false. diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 5ef481733d..52a2abe8da 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -124,7 +124,12 @@ impl PhysicalPlanner for DatafusionQueryEngine { })?; Ok(Arc::new(PhysicalPlanAdapter::new( - Arc::new(physical_plan.schema().into()), + Arc::new( + physical_plan + .schema() + .try_into() + .context(error::ConvertSchemaSnafu)?, + ), physical_plan, ))) } diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index e357b28edb..37994eb9ac 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -13,7 +13,7 @@ use datafusion::execution::runtime_env::RuntimeEnv; use snafu::ResultExt; use table::{ table::adapter::{DfTableProviderAdapter, TableAdapter}, - Table, + TableRef, }; use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider}; @@ -150,7 +150,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { name: String, table: Arc, ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); + let table = Arc::new(TableAdapter::new(table, self.runtime.clone())?); match self.schema_provider.register_table(name, table)? { Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), None => Ok(None), @@ -185,35 +185,46 @@ impl SchemaProvider for SchemaProviderAdapter { self.df_schema_provider.table_names() } - fn table(&self, name: &str) -> Option> { + fn table(&self, name: &str) -> Option { self.df_schema_provider.table(name).map(|table_provider| { - Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ + match table_provider + .as_any() + .downcast_ref::() + { + Some(adapter) => adapter.table(), + None => { + // TODO(yingwen): Avoid panic here. + let adapter = TableAdapter::new(table_provider, self.runtime.clone()) + .expect("convert datafusion table"); + Arc::new(adapter) as _ + } + } }) } - fn register_table( - &self, - name: String, - table: Arc, - ) -> Result>> { - let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + fn register_table(&self, name: String, table: TableRef) -> Result> { + let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); Ok(self .df_schema_provider .register_table(name, table_provider) .context(error::DatafusionSnafu { msg: "Fail to register table to datafusion", })? - .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) + .map(|_| table)) } - fn deregister_table(&self, name: &str) -> Result>> { - Ok(self - .df_schema_provider + fn deregister_table(&self, name: &str) -> Result> { + self.df_schema_provider .deregister_table(name) .context(error::DatafusionSnafu { msg: "Fail to deregister table from datafusion", })? - .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) + .map(|table| { + let adapter = TableAdapter::new(table, self.runtime.clone()) + .context(error::ConvertTableSnafu)?; + Ok(Arc::new(adapter) as _) + }) + .transpose() } fn table_exist(&self, name: &str) -> bool { diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 7ed2d319f1..c9a5293155 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -32,6 +32,18 @@ pub enum InnerError { source: DataFusionError, backtrace: Backtrace, }, + + #[snafu(display("Fail to convert arrow schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Fail to convert table, source: {}", source))] + ConvertTable { + #[snafu(backtrace)] + source: table::error::Error, + }, } impl ErrorExt for InnerError { @@ -42,7 +54,9 @@ impl ErrorExt for InnerError { // TODO(yingwen): Further categorize datafusion error. Datafusion { .. } => StatusCode::EngineExecuteQuery, // This downcast should not fail in usual case. - PhysicalPlanDowncast { .. } => StatusCode::Unexpected, + PhysicalPlanDowncast { .. } | ConvertSchema { .. } | ConvertTable { .. } => { + StatusCode::Unexpected + } ParseSql { source, .. } => source.status_code(), PlanSql { .. } => StatusCode::PlanQuery, } diff --git a/src/query/src/datafusion/plan_adapter.rs b/src/query/src/datafusion/plan_adapter.rs index f2e64b68ca..7a1e24e606 100644 --- a/src/query/src/datafusion/plan_adapter.rs +++ b/src/query/src/datafusion/plan_adapter.rs @@ -97,7 +97,10 @@ impl PhysicalPlan for PhysicalPlanAdapter { msg: "Fail to execute physical plan", })?; - Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + df_stream, + ))) } fn as_any(&self) -> &dyn Any { diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml new file mode 100644 index 0000000000..d7b0881423 --- /dev/null +++ b/src/storage/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "storage" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arc-swap = "1.0" +async-trait = "0.1" +common-error = { path = "../common/error" } +common-telemetry = { path = "../common/telemetry" } +datatypes = { path = "../datatypes" } +snafu = { version = "0.7", features = ["backtraces"] } +store-api = { path = "../store-api" } +tokio = { version = "1.18", features = ["full"] } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs new file mode 100644 index 0000000000..abb4d3a8af --- /dev/null +++ b/src/storage/src/engine.rs @@ -0,0 +1,134 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; +use common_telemetry::logging::info; +use snafu::ResultExt; +use store_api::storage::{EngineContext, RegionDescriptor, StorageEngine}; + +use crate::error::{self, Error, Result}; +use crate::region::RegionImpl; + +/// [StorageEngine] implementation. +#[derive(Clone)] +pub struct EngineImpl { + inner: Arc, +} + +#[async_trait] +impl StorageEngine for EngineImpl { + type Error = Error; + type Region = RegionImpl; + + async fn open_region(&self, _ctx: &EngineContext, _name: &str) -> Result { + unimplemented!() + } + + async fn close_region(&self, _ctx: &EngineContext, _region: RegionImpl) -> Result<()> { + unimplemented!() + } + + async fn create_region( + &self, + _ctx: &EngineContext, + descriptor: RegionDescriptor, + ) -> Result { + self.inner.create_region(descriptor).await + } + + async fn drop_region(&self, _ctx: &EngineContext, _region: RegionImpl) -> Result<()> { + unimplemented!() + } + + fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result> { + Ok(self.inner.get_region(name)) + } +} + +impl EngineImpl { + pub fn new() -> EngineImpl { + EngineImpl { + inner: Arc::new(EngineInner::default()), + } + } +} + +impl Default for EngineImpl { + fn default() -> Self { + Self::new() + } +} + +type RegionMap = HashMap; + +#[derive(Default)] +struct EngineInner { + regions: RwLock, +} + +impl EngineInner { + async fn create_region(&self, descriptor: RegionDescriptor) -> Result { + { + let regions = self.regions.read().unwrap(); + if let Some(region) = regions.get(&descriptor.name) { + return Ok(region.clone()); + } + } + + let region_name = descriptor.name.clone(); + let metadata = descriptor + .try_into() + .context(error::InvalidRegionDescSnafu { + region: ®ion_name, + })?; + let region = RegionImpl::new(region_name.clone(), metadata); + + { + let mut regions = self.regions.write().unwrap(); + if let Some(region) = regions.get(®ion_name) { + return Ok(region.clone()); + } + + regions.insert(region_name.clone(), region.clone()); + } + // TODO(yingwen): Persist region metadata to log. + + // TODO(yingwen): Impl Debug format for region and print region info briefly in log. + info!("Storage engine create region {}", region_name); + + Ok(region) + } + + fn get_region(&self, name: &str) -> Option { + self.regions.read().unwrap().get(name).cloned() + } +} + +#[cfg(test)] +mod tests { + use datatypes::type_id::LogicalTypeId; + use store_api::storage::Region; + + use super::*; + use crate::test_util::descriptor_util::RegionDescBuilder; + + #[tokio::test] + async fn test_create_new_region() { + let engine = EngineImpl::new(); + + let region_name = "region-0"; + let desc = RegionDescBuilder::new(region_name) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)) + .build(); + let ctx = EngineContext::default(); + let region = engine.create_region(&ctx, desc).await.unwrap(); + + assert_eq!(region_name, region.name()); + + let region2 = engine.get_region(&ctx, region_name).unwrap().unwrap(); + assert_eq!(region_name, region2.name()); + + assert!(engine.get_region(&ctx, "no such region").unwrap().is_none()); + } +} diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs new file mode 100644 index 0000000000..da7a3e3189 --- /dev/null +++ b/src/storage/src/error.rs @@ -0,0 +1,75 @@ +use std::any::Any; + +use common_error::prelude::*; + +use crate::metadata::Error as MetadataError; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum Error { + #[snafu(display("Invalid region descriptor, region: {}, source: {}", region, source))] + InvalidRegionDesc { + region: String, + #[snafu(backtrace)] + source: MetadataError, + }, + + #[snafu(display("Invalid schema of input data, region: {}", region))] + InvalidInputSchema { + region: String, + backtrace: Backtrace, + }, + + #[snafu(display("Missing column {} in write batch", column))] + BatchMissingColumn { + column: String, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + InvalidRegionDesc { .. } | InvalidInputSchema { .. } | BatchMissingColumn { .. } => { + StatusCode::InvalidArguments + } + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use snafu::GenerateImplicitData; + + use super::*; + + fn throw_metadata_error() -> std::result::Result<(), MetadataError> { + Err(MetadataError::CfIdExists { + id: 1, + backtrace: Backtrace::generate(), + }) + } + + #[test] + fn test_invalid_region_desc_error() { + let err = throw_metadata_error() + .context(InvalidRegionDescSnafu { region: "hello" }) + .err() + .unwrap(); + + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + assert!(err.backtrace_opt().is_some()); + } +} diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs new file mode 100644 index 0000000000..fc54ad9def --- /dev/null +++ b/src/storage/src/lib.rs @@ -0,0 +1,17 @@ +//! Storage engine implementation. + +mod engine; +mod error; +mod memtable; +pub mod metadata; +mod region; +mod region_writer; +mod snapshot; +pub mod sync; +mod version; +mod write_batch; + +#[cfg(test)] +mod test_util; + +pub use engine::EngineImpl; diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs new file mode 100644 index 0000000000..adb670a6db --- /dev/null +++ b/src/storage/src/memtable.rs @@ -0,0 +1,99 @@ +mod btree; +mod inserter; +mod schema; + +use std::mem; +use std::sync::Arc; + +use datatypes::vectors::VectorRef; +use store_api::storage::{SequenceNumber, ValueType}; + +use crate::error::Result; +use crate::memtable::btree::BTreeMemtable; +pub use crate::memtable::inserter::Inserter; +pub use crate::memtable::schema::MemtableSchema; + +/// In memory storage. +pub trait Memtable: Send + Sync { + fn schema(&self) -> &MemtableSchema; + + /// Write key/values to the memtable. + /// + /// # Panics + /// Panic if the schema of key/value differs from memtable's schema. + fn write(&self, kvs: &KeyValues) -> Result<()>; + + /// Returns the estimated bytes allocated by this memtable from heap. + fn bytes_allocated(&self) -> usize; +} + +pub type MemtableRef = Arc; + +pub trait MemtableBuilder: Send + Sync { + fn build(&self, schema: MemtableSchema) -> MemtableRef; +} + +pub type MemtableBuilderRef = Arc; + +// TODO(yingwen): Maybe use individual vector for timestamp and version. +/// Key-value pairs in columnar format. +pub struct KeyValues { + pub sequence: SequenceNumber, + pub value_type: ValueType, + /// Start index of these key-value paris in batch. + pub start_index_in_batch: usize, + pub keys: Vec, + pub values: Vec, +} + +impl KeyValues { + // Note that `sequence` is not reset. + fn reset(&mut self, value_type: ValueType, index_in_batch: usize) { + self.value_type = value_type; + self.start_index_in_batch = index_in_batch; + self.keys.clear(); + self.values.clear(); + } +} + +impl KeyValues { + pub fn len(&self) -> usize { + self.keys.first().map(|v| v.len()).unwrap_or_default() + } +} + +pub struct DefaultMemtableBuilder {} + +impl MemtableBuilder for DefaultMemtableBuilder { + fn build(&self, schema: MemtableSchema) -> MemtableRef { + Arc::new(BTreeMemtable::new(schema)) + } +} + +pub struct MemtableSet { + mem: MemtableRef, + // TODO(yingwen): Support multiple immutable memtables. + _immem: Option, +} + +impl MemtableSet { + pub fn new(mem: MemtableRef) -> MemtableSet { + MemtableSet { mem, _immem: None } + } + + pub fn mutable_memtable(&self) -> &MemtableRef { + &self.mem + } + + /// Switch mutable memtable to immutable memtable, returns the old mutable memtable if success. + pub fn _switch_memtable(&mut self, mem: &MemtableRef) -> std::result::Result { + match &self._immem { + Some(_) => Err(()), + None => { + let old_mem = mem::replace(&mut self.mem, mem.clone()); + self._immem = Some(old_mem.clone()); + Ok(old_mem) + } + } + } +} diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs new file mode 100644 index 0000000000..4c1382837f --- /dev/null +++ b/src/storage/src/memtable/btree.rs @@ -0,0 +1,136 @@ +use std::cmp::Ordering; +use std::collections::BTreeMap; +use std::sync::RwLock; + +use datatypes::value::Value; +use store_api::storage::{SequenceNumber, ValueType}; + +use crate::error::Result; +use crate::memtable::{KeyValues, Memtable, MemtableSchema}; + +/// A simple memtable implementation based on std's [`BTreeMap`]. +/// +/// Mainly for test purpose. +pub struct BTreeMemtable { + schema: MemtableSchema, + map: RwLock>, +} + +impl BTreeMemtable { + pub fn new(schema: MemtableSchema) -> BTreeMemtable { + BTreeMemtable { + schema, + map: RwLock::new(BTreeMap::new()), + } + } +} + +impl Memtable for BTreeMemtable { + fn schema(&self) -> &MemtableSchema { + &self.schema + } + + fn write(&self, kvs: &KeyValues) -> Result<()> { + let mut map = self.map.write().unwrap(); + + let iter_row = IterRow::new(kvs); + for (row_key, row_value) in iter_row { + map.insert(row_key, row_value); + } + + Ok(()) + } + + fn bytes_allocated(&self) -> usize { + unimplemented!() + } +} + +struct IterRow<'a> { + kvs: &'a KeyValues, + index: usize, + len: usize, +} + +impl<'a> IterRow<'a> { + fn new(kvs: &KeyValues) -> IterRow { + IterRow { + kvs, + index: 0, + len: kvs.len(), + } + } + + fn fetch_row(&mut self) -> (RowKey, RowValue) { + let keys = self + .kvs + .keys + .iter() + .map(|vector| vector.get(self.index)) + .collect(); + let row_key = RowKey { + keys, + sequence: self.kvs.sequence, + index_in_batch: self.kvs.start_index_in_batch + self.index, + value_type: self.kvs.value_type, + }; + + let row_value = RowValue { + _values: self + .kvs + .values + .iter() + .map(|vector| vector.get(self.index)) + .collect(), + }; + + (row_key, row_value) + } +} + +impl<'a> Iterator for IterRow<'a> { + type Item = (RowKey, RowValue); + + fn next(&mut self) -> Option<(RowKey, RowValue)> { + if self.index >= self.len { + return None; + } + + Some(self.fetch_row()) + } + + fn size_hint(&self) -> (usize, Option) { + (self.kvs.keys.len(), Some(self.kvs.keys.len())) + } +} + +// TODO(yingwen): Actually the version and timestamp may order desc. +#[derive(PartialEq, Eq)] +struct RowKey { + keys: Vec, + sequence: SequenceNumber, + index_in_batch: usize, + value_type: ValueType, +} + +impl Ord for RowKey { + fn cmp(&self, other: &RowKey) -> Ordering { + // Order by (keys asc, sequence desc, index_in_batch desc, value type desc), though (key, + // sequence, index_in_batch) should be enough to disambiguate. + self.keys + .cmp(&other.keys) + .then_with(|| other.sequence.cmp(&self.sequence)) + .then_with(|| other.index_in_batch.cmp(&self.index_in_batch)) + .then_with(|| other.value_type.cmp(&self.value_type)) + } +} + +impl PartialOrd for RowKey { + fn partial_cmp(&self, other: &RowKey) -> Option { + Some(self.cmp(other)) + } +} + +struct RowValue { + _values: Vec, +} diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs new file mode 100644 index 0000000000..851d758a52 --- /dev/null +++ b/src/storage/src/memtable/inserter.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use datatypes::vectors::{NullVector, VectorRef}; +use snafu::ensure; +use store_api::storage::{ColumnDescriptor, SequenceNumber, ValueType}; + +use crate::error::{self, Result}; +use crate::memtable::{KeyValues, Memtable}; +use crate::write_batch::{Mutation, PutData, WriteBatch}; + +/// Wraps logic of inserting key/values in [WriteBatch] to [Memtable]. +pub struct Inserter { + /// Sequence of the batch to be inserted. + sequence: SequenceNumber, + index_in_batch: usize, +} + +impl Inserter { + pub fn new(sequence: SequenceNumber) -> Inserter { + Inserter { + sequence, + index_in_batch: 0, + } + } + + // TODO(yingwen): Can we take the WriteBatch? + /// Insert write batch into memtable. + /// + /// Won't do schema validation. + pub fn insert_memtable(&mut self, batch: &WriteBatch, memtable: &dyn Memtable) -> Result<()> { + if batch.is_empty() { + return Ok(()); + } + + let schema = memtable.schema(); + // Reusable KeyValues buffer. + let mut kvs = KeyValues { + sequence: self.sequence, + value_type: ValueType::Put, + start_index_in_batch: self.index_in_batch, + keys: Vec::with_capacity(schema.num_row_key_columns()), + values: Vec::with_capacity(schema.num_value_columns()), + }; + + for mutation in batch { + match mutation { + Mutation::Put(put_data) => { + self.put_impl(put_data, memtable, &mut kvs)?; + } + } + } + + Ok(()) + } + + fn put_impl( + &mut self, + put_data: &PutData, + memtable: &dyn Memtable, + kvs: &mut KeyValues, + ) -> Result<()> { + let schema = memtable.schema(); + let num_rows = put_data.num_rows(); + + kvs.reset(ValueType::Put, self.index_in_batch); + + for key_col in schema.row_key_columns() { + clone_put_data_column_to(put_data, &key_col.desc, &mut kvs.keys)?; + } + + for value_col in schema.value_columns() { + clone_put_data_column_to(put_data, &value_col.desc, &mut kvs.values)?; + } + + memtable.write(kvs)?; + + self.index_in_batch += num_rows; + + Ok(()) + } +} + +fn clone_put_data_column_to( + put_data: &PutData, + desc: &ColumnDescriptor, + target: &mut Vec, +) -> Result<()> { + if let Some(vector) = put_data.column_by_name(&desc.name) { + target.push(vector.clone()); + } else { + // The write batch should have been validated before. + ensure!( + desc.is_nullable, + error::BatchMissingColumnSnafu { column: &desc.name } + ); + + let num_rows = put_data.num_rows(); + target.push(Arc::new(NullVector::new(num_rows))); + } + + Ok(()) +} diff --git a/src/storage/src/memtable/schema.rs b/src/storage/src/memtable/schema.rs new file mode 100644 index 0000000000..2b2dc204e6 --- /dev/null +++ b/src/storage/src/memtable/schema.rs @@ -0,0 +1,31 @@ +use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef}; + +pub struct MemtableSchema { + columns_row_key: ColumnsRowKeyMetadataRef, +} + +impl MemtableSchema { + pub fn new(columns_row_key: ColumnsRowKeyMetadataRef) -> MemtableSchema { + MemtableSchema { columns_row_key } + } + + #[inline] + pub fn row_key_columns(&self) -> impl Iterator { + self.columns_row_key.iter_row_key_columns() + } + + #[inline] + pub fn value_columns(&self) -> impl Iterator { + self.columns_row_key.iter_value_columns() + } + + #[inline] + pub fn num_row_key_columns(&self) -> usize { + self.columns_row_key.num_row_key_columns() + } + + #[inline] + pub fn num_value_columns(&self) -> usize { + self.columns_row_key.num_value_columns() + } +} diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs new file mode 100644 index 0000000000..d222d7b14a --- /dev/null +++ b/src/storage/src/metadata.rs @@ -0,0 +1,460 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::data_type::ConcreteDataType; +use snafu::ensure; +use store_api::storage::{ + consts, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, + ColumnId, ColumnSchema, RegionDescriptor, RegionMeta, RowKeyDescriptor, Schema, SchemaRef, +}; + +/// Error for handling metadata. +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Column name already exists, name: {}", name))] + ColNameExists { name: String, backtrace: Backtrace }, + + #[snafu(display("Column family name already exists, name: {}", name))] + CfNameExists { name: String, backtrace: Backtrace }, + + #[snafu(display("Column family id already exists, id: {}", id))] + CfIdExists { id: ColumnId, backtrace: Backtrace }, +} + +pub type Result = std::result::Result; + +/// Implementation of [RegionMeta]. +/// +/// Holds a snapshot of region metadata. +pub struct RegionMetaImpl { + metadata: RegionMetadataRef, +} + +impl RegionMetaImpl { + pub fn new(metadata: RegionMetadataRef) -> RegionMetaImpl { + RegionMetaImpl { metadata } + } +} + +impl RegionMeta for RegionMetaImpl { + fn schema(&self) -> &SchemaRef { + &self.metadata.schema + } +} + +pub type VersionNumber = u32; + +// TODO(yingwen): Make some fields of metadata private. + +/// In memory metadata of region. +#[derive(Clone)] +pub struct RegionMetadata { + /// Schema of the region. + /// + /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` + /// conveniently. The fields order in `SchemaRef` **must** be consistent with + /// columns order in [ColumnsMetadata] to ensure the projection index of a field + /// is correct. + pub schema: SchemaRef, + pub columns_row_key: ColumnsRowKeyMetadataRef, + pub column_families: ColumnFamiliesMetadata, + /// Version of the metadata. Version is set to zero initially and bumped once the + /// metadata have been altered. + pub version: VersionNumber, +} + +pub type RegionMetadataRef = Arc; + +#[derive(Clone)] +pub struct ColumnMetadata { + pub cf_id: ColumnFamilyId, + pub desc: ColumnDescriptor, +} + +#[derive(Clone)] +pub struct ColumnsMetadata { + /// All columns, in `(key columns, timestamp, [version,] value columns)` order. + /// + /// Columns order should be consistent with fields order in [SchemaRef]. + pub columns: Vec, + /// Maps column name to index of columns, used to fast lookup column by name. + pub name_to_col_index: HashMap, +} + +#[derive(Default, Clone)] +pub struct RowKeyMetadata { + /// Exclusive end index of row key columns. + row_key_end: usize, + /// Index of timestamp key column. + pub timestamp_key_index: usize, + /// If version column is enabled, then the last column of key columns is a + /// version column. + pub enable_version_column: bool, +} + +#[derive(Clone)] +pub struct ColumnsRowKeyMetadata { + columns: ColumnsMetadata, + row_key: RowKeyMetadata, +} + +impl ColumnsRowKeyMetadata { + pub fn iter_row_key_columns(&self) -> impl Iterator { + self.columns.columns.iter().take(self.row_key.row_key_end) + } + + pub fn iter_value_columns(&self) -> impl Iterator { + self.columns.columns.iter().skip(self.row_key.row_key_end) + } + + #[inline] + pub fn num_row_key_columns(&self) -> usize { + self.row_key.row_key_end + } + + #[inline] + pub fn num_value_columns(&self) -> usize { + self.columns.columns.len() - self.row_key.row_key_end + } +} + +pub type ColumnsRowKeyMetadataRef = Arc; + +#[derive(Clone)] +pub struct ColumnFamiliesMetadata { + /// Map column family id to column family metadata. + id_to_cfs: HashMap, +} + +impl ColumnFamiliesMetadata { + pub fn cf_by_id(&self, cf_id: ColumnFamilyId) -> Option<&ColumnFamilyMetadata> { + self.id_to_cfs.get(&cf_id) + } +} + +#[derive(Clone)] +pub struct ColumnFamilyMetadata { + /// Column family name. + pub name: String, + pub cf_id: ColumnFamilyId, + /// Inclusive start index of columns in the column family. + pub column_index_start: usize, + /// Exclusive end index of columns in the column family. + pub column_index_end: usize, +} + +impl TryFrom for RegionMetadata { + type Error = Error; + + fn try_from(desc: RegionDescriptor) -> Result { + // Doesn't set version explicitly here, because this is a new region meta + // created from descriptor, using initial version is reasonable. + let mut builder = RegionMetadataBuilder::new() + .row_key(desc.row_key)? + .add_column_family(desc.default_cf)?; + for cf in desc.extra_cfs { + builder = builder.add_column_family(cf)?; + } + + Ok(builder.build()) + } +} + +#[derive(Default)] +struct RegionMetadataBuilder { + columns: Vec, + column_schemas: Vec, + name_to_col_index: HashMap, + + row_key: RowKeyMetadata, + + id_to_cfs: HashMap, + cf_names: HashSet, +} + +impl RegionMetadataBuilder { + fn new() -> RegionMetadataBuilder { + RegionMetadataBuilder::default() + } + + fn row_key(mut self, key: RowKeyDescriptor) -> Result { + for col in key.columns { + self.push_row_key_column(col)?; + } + + // TODO(yingwen): Validate this is a timestamp column. + let timestamp_key_index = self.columns.len(); + self.push_row_key_column(key.timestamp)?; + + if key.enable_version_column { + // TODO(yingwen): Validate that version column must be uint64 column. + let version_col = version_column_desc(); + self.push_row_key_column(version_col)?; + } + + let row_key_end = self.columns.len(); + + self.row_key = RowKeyMetadata { + row_key_end, + timestamp_key_index, + enable_version_column: key.enable_version_column, + }; + + Ok(self) + } + + fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result { + ensure!( + !self.id_to_cfs.contains_key(&cf.cf_id), + CfIdExistsSnafu { id: cf.cf_id } + ); + + ensure!( + !self.cf_names.contains(&cf.name), + CfNameExistsSnafu { name: &cf.name } + ); + + let column_index_start = self.columns.len(); + let column_index_end = column_index_start + cf.columns.len(); + for col in cf.columns { + self.push_value_column(cf.cf_id, col)?; + } + + let cf_meta = ColumnFamilyMetadata { + name: cf.name.clone(), + cf_id: cf.cf_id, + column_index_start, + column_index_end, + }; + + self.id_to_cfs.insert(cf.cf_id, cf_meta); + self.cf_names.insert(cf.name); + + Ok(self) + } + + fn build(self) -> RegionMetadata { + let schema = Arc::new(Schema::new(self.column_schemas)); + let columns = ColumnsMetadata { + columns: self.columns, + name_to_col_index: self.name_to_col_index, + }; + let columns_row_key = Arc::new(ColumnsRowKeyMetadata { + columns, + row_key: self.row_key, + }); + + RegionMetadata { + schema, + columns_row_key, + column_families: ColumnFamiliesMetadata { + id_to_cfs: self.id_to_cfs, + }, + version: 0, + } + } + + // Helper methods: + + fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<()> { + self.push_value_column(consts::KEY_CF_ID, desc) + } + + fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { + ensure!( + !self.name_to_col_index.contains_key(&desc.name), + ColNameExistsSnafu { name: &desc.name } + ); + + let column_schema = ColumnSchema::from(&desc); + + let column_name = desc.name.clone(); + let meta = ColumnMetadata { cf_id, desc }; + + // TODO(yingwen): Store cf_id to metadata in field. + + let column_index = self.columns.len(); + self.columns.push(meta); + self.column_schemas.push(column_schema); + self.name_to_col_index.insert(column_name, column_index); + + Ok(()) + } +} + +fn version_column_desc() -> ColumnDescriptor { + ColumnDescriptorBuilder::new( + consts::VERSION_COLUMN_ID, + consts::VERSION_COLUMN_NAME.to_string(), + ConcreteDataType::uint64_datatype(), + ) + .is_nullable(false) + .build() +} + +// TODO(yingwen): Add tests for using invalid row_key/cf to build metadata. +#[cfg(test)] +mod tests { + use datatypes::type_id::LogicalTypeId; + use store_api::storage::{ + ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, RowKeyDescriptorBuilder, + }; + + use super::*; + use crate::test_util::descriptor_util::RegionDescBuilder; + use crate::test_util::schema_util; + + #[test] + fn test_descriptor_to_region_metadata() { + let desc = RegionDescBuilder::new("region-0") + .timestamp(("ts", LogicalTypeId::UInt64, false)) + .enable_version_column(false) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)) + .build(); + + let expect_schema = schema_util::new_schema_ref(&[ + ("k1", LogicalTypeId::Int32, false), + ("ts", LogicalTypeId::UInt64, false), + ("v1", LogicalTypeId::Float32, true), + ]); + + let metadata = RegionMetadata::try_from(desc).unwrap(); + assert_eq!(expect_schema, metadata.schema); + assert_eq!(2, metadata.columns_row_key.num_row_key_columns()); + assert_eq!(1, metadata.columns_row_key.num_value_columns()); + } + + #[test] + fn test_build_empty_region_metadata() { + let metadata = RegionMetadataBuilder::default().build(); + assert!(metadata.schema.column_schemas().is_empty()); + + assert!(metadata.columns_row_key.columns.columns.is_empty()); + assert_eq!(0, metadata.columns_row_key.num_row_key_columns()); + assert!(metadata + .columns_row_key + .iter_row_key_columns() + .next() + .is_none()); + assert_eq!(0, metadata.columns_row_key.num_value_columns()); + assert!(metadata + .columns_row_key + .iter_value_columns() + .next() + .is_none()); + + assert!(metadata.column_families.id_to_cfs.is_empty()); + + assert_eq!(0, metadata.version); + } + + fn new_metadata(enable_version_column: bool) -> RegionMetadata { + let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build(); + let row_key = RowKeyDescriptorBuilder::new(timestamp) + .push_column( + ColumnDescriptorBuilder::new(3, "k1", ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build(), + ) + .enable_version_column(enable_version_column) + .build(); + let cf = ColumnFamilyDescriptorBuilder::new() + .push_column( + ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype()).build(), + ) + .build(); + RegionMetadataBuilder::new() + .row_key(row_key) + .unwrap() + .add_column_family(cf) + .unwrap() + .build() + } + + #[test] + fn test_build_metedata_disable_version() { + let metadata = new_metadata(false); + + let expect_schema = schema_util::new_schema_ref(&[ + ("k1", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Int64, true), + ]); + + assert_eq!(expect_schema, metadata.schema); + + // 3 columns + assert_eq!(3, metadata.columns_row_key.columns.columns.len()); + // 2 row key columns + assert_eq!(2, metadata.columns_row_key.num_row_key_columns()); + let row_key_names: Vec<_> = metadata + .columns_row_key + .iter_row_key_columns() + .map(|column| &column.desc.name) + .collect(); + assert_eq!(["k1", "ts"], &row_key_names[..]); + // 1 value column + assert_eq!(1, metadata.columns_row_key.num_value_columns()); + let value_names: Vec<_> = metadata + .columns_row_key + .iter_value_columns() + .map(|column| &column.desc.name) + .collect(); + assert_eq!(["v1"], &value_names[..]); + // Check timestamp index. + assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index); + // Check version column. + assert!(!metadata.columns_row_key.row_key.enable_version_column); + + assert!(metadata + .column_families + .cf_by_id(consts::DEFAULT_CF_ID) + .is_some()); + + assert_eq!(0, metadata.version); + } + + #[test] + fn test_build_metedata_enable_version() { + let metadata = new_metadata(true); + + let expect_schema = schema_util::new_schema_ref(&[ + ("k1", LogicalTypeId::Int64, false), + ("ts", LogicalTypeId::Int64, false), + (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), + ("v1", LogicalTypeId::Int64, true), + ]); + + assert_eq!(expect_schema, metadata.schema); + + // 4 columns + assert_eq!(4, metadata.columns_row_key.columns.columns.len()); + // 3 row key columns + assert_eq!(3, metadata.columns_row_key.num_row_key_columns()); + let row_key_names: Vec<_> = metadata + .columns_row_key + .iter_row_key_columns() + .map(|column| &column.desc.name) + .collect(); + assert_eq!( + ["k1", "ts", consts::VERSION_COLUMN_NAME], + &row_key_names[..] + ); + // 1 value column + assert_eq!(1, metadata.columns_row_key.num_value_columns()); + let value_names: Vec<_> = metadata + .columns_row_key + .iter_value_columns() + .map(|column| &column.desc.name) + .collect(); + assert_eq!(["v1"], &value_names[..]); + // Check timestamp index. + assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index); + // Check version column. + assert!(metadata.columns_row_key.row_key.enable_version_column); + } +} diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs new file mode 100644 index 0000000000..b3bfdfff7b --- /dev/null +++ b/src/storage/src/region.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use snafu::ensure; +use store_api::storage::{ReadContext, Region, RegionMeta, WriteContext, WriteResponse}; +use tokio::sync::Mutex; + +use crate::error::{self, Error, Result}; +use crate::memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableSchema, MemtableSet}; +use crate::metadata::{RegionMetaImpl, RegionMetadata}; +use crate::region_writer::RegionWriter; +use crate::snapshot::SnapshotImpl; +use crate::version::{VersionControl, VersionControlRef}; +use crate::write_batch::WriteBatch; + +/// [Region] implementation. +#[derive(Clone)] +pub struct RegionImpl { + inner: Arc, +} + +#[async_trait] +impl Region for RegionImpl { + type Error = Error; + type Meta = RegionMetaImpl; + type WriteRequest = WriteBatch; + type Snapshot = SnapshotImpl; + + fn name(&self) -> &str { + &self.inner.name + } + + fn in_memory_metadata(&self) -> RegionMetaImpl { + self.inner.in_memory_metadata() + } + + async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { + self.inner.write(ctx, request).await + } + + fn snapshot(&self, _ctx: &ReadContext) -> Result { + unimplemented!() + } +} + +impl RegionImpl { + pub fn new(name: String, metadata: RegionMetadata) -> RegionImpl { + let memtable_builder = Arc::new(DefaultMemtableBuilder {}); + let memtable_schema = MemtableSchema::new(metadata.columns_row_key.clone()); + let mem = memtable_builder.build(memtable_schema); + let memtables = MemtableSet::new(mem); + + let version = VersionControl::new(metadata, memtables); + let inner = Arc::new(RegionInner { + name, + version: Arc::new(version), + writer: Mutex::new(RegionWriter::new(memtable_builder)), + }); + + RegionImpl { inner } + } +} + +struct RegionInner { + name: String, + version: VersionControlRef, + writer: Mutex, +} + +impl RegionInner { + fn in_memory_metadata(&self) -> RegionMetaImpl { + let metadata = self.version.metadata(); + + RegionMetaImpl::new(metadata) + } + + async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { + let metadata = self.in_memory_metadata(); + let schema = metadata.schema(); + // Only compare column schemas. + ensure!( + schema.column_schemas() == request.schema().column_schemas(), + error::InvalidInputSchemaSnafu { region: &self.name } + ); + + // Now altering schema is not allowed, so it is safe to validate schema outside of the lock. + let mut writer = self.writer.lock().await; + writer.write(ctx, &self.version, request).await + } +} + +#[cfg(test)] +mod tests { + use datatypes::type_id::LogicalTypeId; + use store_api::storage::consts; + + use super::*; + use crate::test_util::descriptor_util::RegionDescBuilder; + use crate::test_util::schema_util; + + #[test] + fn test_new_region() { + let region_name = "region-0"; + let desc = RegionDescBuilder::new(region_name) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)) + .build(); + let metadata = desc.try_into().unwrap(); + + let region = RegionImpl::new(region_name.to_string(), metadata); + + let expect_schema = schema_util::new_schema_ref(&[ + ("k1", LogicalTypeId::Int32, false), + ("timestamp", LogicalTypeId::UInt64, false), + (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), + ("v1", LogicalTypeId::Float32, true), + ]); + + assert_eq!(region_name, region.name()); + assert_eq!(expect_schema, *region.in_memory_metadata().schema()); + } +} diff --git a/src/storage/src/region_writer.rs b/src/storage/src/region_writer.rs new file mode 100644 index 0000000000..3d4e7eb2c4 --- /dev/null +++ b/src/storage/src/region_writer.rs @@ -0,0 +1,43 @@ +use store_api::storage::{SequenceNumber, WriteContext, WriteResponse}; + +use crate::error::Result; +use crate::memtable::{Inserter, MemtableBuilderRef}; +use crate::version::VersionControlRef; +use crate::write_batch::WriteBatch; + +pub struct RegionWriter { + _memtable_builder: MemtableBuilderRef, + last_sequence: SequenceNumber, +} + +impl RegionWriter { + pub fn new(_memtable_builder: MemtableBuilderRef) -> RegionWriter { + RegionWriter { + _memtable_builder, + last_sequence: 0, + } + } + + // TODO(yingwen): Support group commit so we can avoid taking mutable reference. + /// Write `WriteBatch` to region, now the schema of batch needs to be validated outside. + pub async fn write( + &mut self, + _ctx: &WriteContext, + version_control: &VersionControlRef, + request: WriteBatch, + ) -> Result { + // Mutable reference of writer ensure no other reference of this writer can modify + // the version control (write is exclusive). + + // TODO(yingwen): Write wal and get sequence. + let version = version_control.current(); + let memtables = &version.memtables; + + let mem = memtables.mutable_memtable(); + self.last_sequence += 1; + let mut inserter = Inserter::new(self.last_sequence); + inserter.insert_memtable(&request, &**mem)?; + + Ok(WriteResponse {}) + } +} diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs new file mode 100644 index 0000000000..ff76c4947e --- /dev/null +++ b/src/storage/src/snapshot.rs @@ -0,0 +1,26 @@ +use async_trait::async_trait; +use store_api::storage::{ + GetRequest, GetResponse, ReadContext, ScanRequest, ScanResponse, SchemaRef, Snapshot, +}; + +use crate::error::{Error, Result}; + +/// [Snapshot] implementation. +pub struct SnapshotImpl {} + +#[async_trait] +impl Snapshot for SnapshotImpl { + type Error = Error; + + fn schema(&self) -> &SchemaRef { + unimplemented!() + } + + async fn scan(&self, _ctx: &ReadContext, _request: ScanRequest) -> Result { + unimplemented!() + } + + async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result { + unimplemented!() + } +} diff --git a/src/storage/src/sync.rs b/src/storage/src/sync.rs new file mode 100644 index 0000000000..3455fe2ade --- /dev/null +++ b/src/storage/src/sync.rs @@ -0,0 +1,125 @@ +//! Synchronization utilities + +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex, MutexGuard}; + +use arc_swap::ArcSwap; + +/// A thread safe clone-on-write cell. +/// +/// Each read returns a read only clone of the internal data and won't block +/// write. Write to the cell data needs to acquire a lock txn first and +/// modifications are not visible to others until the txn is committed. +#[derive(Debug)] +pub struct CowCell { + inner: ArcSwap, + mutex: Mutex<()>, +} + +impl CowCell { + /// Create a new cell. + pub fn new(data: T) -> CowCell { + CowCell { + inner: ArcSwap::from(Arc::new(data)), + mutex: Mutex::new(()), + } + } + + /// Get a read only clone from the cell. + pub fn get(&self) -> Arc { + self.inner.load_full() + } +} + +impl CowCell { + /// Acquire a write txn, blocking the current thread. + /// + /// Note that this will clone the inner data. + pub fn lock(&self) -> TxnGuard { + let _guard = self.mutex.lock().unwrap(); + // Acquire a clone of data inside lock. + let data = (*self.get()).clone(); + + TxnGuard { + inner: &self.inner, + data, + _guard, + } + } +} + +/// A RAII implementation of a write transaction of the [CowCell]. +/// +/// When this txn is dropped (falls out of scope or commited), the lock will be +/// unlocked, but updates to the content won't be visible unless the txn is committed. +#[must_use = "if unused the CowCell will immediately unlock"] +pub struct TxnGuard<'a, T: Clone> { + inner: &'a ArcSwap, + data: T, + _guard: MutexGuard<'a, ()>, +} + +impl TxnGuard<'_, T> { + /// Commit updates to the cell and release the lock. + pub fn commit(self) { + let data = Arc::new(self.data); + self.inner.store(data); + } +} + +impl Deref for TxnGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + &self.data + } +} + +impl DerefMut for TxnGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.data + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cow_cell_commit() { + let cell = CowCell::new(10); + assert_eq!(10, *cell.get()); + + let mut data = cell.lock(); + assert_eq!(10, *data); + + // It's okay to get read only clone from the cell during lock is held. + assert_eq!(10, *cell.get()); + + *data += 2; + + assert_eq!(*data, 12); + // The modification is still not visible. + assert_eq!(10, *cell.get()); + + // Commit the txn. + data.commit(); + + // Once the guard is committed, the new data is visible. + assert_eq!(12, *cell.get()); + } + + #[test] + fn test_cow_cell_cancel() { + let cell = CowCell::new(10); + assert_eq!(10, *cell.get()); + + { + let mut data = cell.lock(); + *data += 2; + } + + // The update is not committed, should not be visible. + assert_eq!(10, *cell.get()); + } +} diff --git a/src/storage/src/test_util.rs b/src/storage/src/test_util.rs new file mode 100644 index 0000000000..dcd7bb18f2 --- /dev/null +++ b/src/storage/src/test_util.rs @@ -0,0 +1,3 @@ +pub mod descriptor_util; +pub mod schema_util; +pub mod write_batch_util; diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs new file mode 100644 index 0000000000..bfe642f874 --- /dev/null +++ b/src/storage/src/test_util/descriptor_util.rs @@ -0,0 +1,78 @@ +use datatypes::prelude::ConcreteDataType; +use store_api::storage::{ + ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId, + RegionDescriptor, RowKeyDescriptorBuilder, +}; + +use crate::test_util::schema_util::ColumnDef; + +/// A RegionDescriptor builder for test. +pub struct RegionDescBuilder { + name: String, + last_column_id: ColumnId, + key_builder: RowKeyDescriptorBuilder, + default_cf_builder: ColumnFamilyDescriptorBuilder, +} + +impl RegionDescBuilder { + pub fn new>(name: T) -> Self { + let key_builder = RowKeyDescriptorBuilder::new( + ColumnDescriptorBuilder::new(2, "timestamp", ConcreteDataType::uint64_datatype()) + .is_nullable(false) + .build(), + ); + + Self { + name: name.into(), + last_column_id: 2, + key_builder, + default_cf_builder: ColumnFamilyDescriptorBuilder::new(), + } + } + + // This will reset the row key builder, so should be called before `push_key_column()` + // and `enable_version_column()`, or just call after `new()`. + pub fn timestamp(mut self, column_def: ColumnDef) -> Self { + let builder = RowKeyDescriptorBuilder::new(self.new_column(column_def)); + self.key_builder = builder; + self + } + + pub fn enable_version_column(mut self, enable: bool) -> Self { + self.key_builder = self.key_builder.enable_version_column(enable); + self + } + + pub fn push_key_column(mut self, column_def: ColumnDef) -> Self { + let column = self.new_column(column_def); + self.key_builder = self.key_builder.push_column(column); + self + } + + pub fn push_value_column(mut self, column_def: ColumnDef) -> Self { + let column = self.new_column(column_def); + self.default_cf_builder = self.default_cf_builder.push_column(column); + self + } + + pub fn build(self) -> RegionDescriptor { + RegionDescriptor { + name: self.name, + row_key: self.key_builder.build(), + default_cf: self.default_cf_builder.build(), + extra_cfs: Vec::new(), + } + } + + fn alloc_column_id(&mut self) -> ColumnId { + self.last_column_id += 1; + self.last_column_id + } + + fn new_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor { + let datatype = column_def.1.data_type(); + ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype) + .is_nullable(column_def.2) + .build() + } +} diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs new file mode 100644 index 0000000000..482a90caa1 --- /dev/null +++ b/src/storage/src/test_util/schema_util.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use datatypes::prelude::*; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; + +/// Column definition: (name, datatype, is_nullable) +pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); + +pub fn new_schema(column_defs: &[ColumnDef]) -> Schema { + let column_schemas = column_defs + .iter() + .map(|column_def| { + let datatype = column_def.1.data_type(); + ColumnSchema::new(column_def.0, datatype, column_def.2) + }) + .collect(); + + Schema::new(column_schemas) +} + +pub fn new_schema_ref(column_defs: &[ColumnDef]) -> SchemaRef { + Arc::new(new_schema(column_defs)) +} diff --git a/src/storage/src/test_util/write_batch_util.rs b/src/storage/src/test_util/write_batch_util.rs new file mode 100644 index 0000000000..20f59f0c99 --- /dev/null +++ b/src/storage/src/test_util/write_batch_util.rs @@ -0,0 +1,10 @@ +use store_api::storage::WriteRequest; + +use crate::test_util::schema_util::{self, ColumnDef}; +use crate::write_batch::WriteBatch; + +pub fn new_write_batch(column_defs: &[ColumnDef]) -> WriteBatch { + let schema = schema_util::new_schema_ref(column_defs); + + WriteBatch::new(schema) +} diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs new file mode 100644 index 0000000000..6ca8d6ce37 --- /dev/null +++ b/src/storage/src/version.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use crate::memtable::MemtableSet; +use crate::metadata::{RegionMetadata, RegionMetadataRef}; +use crate::sync::CowCell; + +/// Controls version of in memory state for a region. +pub struct VersionControl { + version: CowCell, +} + +impl VersionControl { + /// Construct a new version control from `metadata`. + pub fn new(metadata: RegionMetadata, memtables: MemtableSet) -> VersionControl { + VersionControl { + version: CowCell::new(Version::new(metadata, memtables)), + } + } + + /// Returns current version. + pub fn current(&self) -> VersionRef { + self.version.get() + } + + /// Metadata of current version. + pub fn metadata(&self) -> RegionMetadataRef { + let version = self.current(); + version.metadata.clone() + } +} + +pub type VersionControlRef = Arc; +pub type VersionRef = Arc; + +// Get data from version, need to +// 1. acquire version first +// 2. acquire sequence later +// +// Reason: data may flush and some data with old sequence may be removed, so need +// to acquire version at first. + +/// Version contains metadata and state of region. +pub struct Version { + /// Metadata of the region. Altering metadata isn't frequent, storing metadata + /// in Arc to allow sharing metadata and reuse metadata when creating a new + /// `Version`. + metadata: RegionMetadataRef, + pub memtables: MemtableSet, + // TODO(yingwen): Also need to store last sequence to this version when switching + // version, so we can know the newest data can read from this version. +} + +impl Version { + pub fn new(metadata: RegionMetadata, memtables: MemtableSet) -> Version { + Version { + metadata: Arc::new(metadata), + memtables, + } + } +} diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs new file mode 100644 index 0000000000..22e0bf8daa --- /dev/null +++ b/src/storage/src/write_batch.rs @@ -0,0 +1,442 @@ +use std::any::Any; +use std::collections::HashMap; +use std::slice; + +use common_error::prelude::*; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::SchemaRef; +use datatypes::vectors::VectorRef; +use snafu::ensure; +use store_api::storage::{consts, PutOperation, WriteRequest}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Duplicate column {} in same request", name))] + DuplicateColumn { name: String, backtrace: Backtrace }, + + #[snafu(display("Missing column {} in request", name))] + MissingColumn { name: String, backtrace: Backtrace }, + + #[snafu(display( + "Type of column {} does not match type in schema, expect {:?}, given {:?}", + name, + expect, + given + ))] + TypeMismatch { + name: String, + expect: ConcreteDataType, + given: ConcreteDataType, + backtrace: Backtrace, + }, + + #[snafu(display("Column {} is not null but input has null", name))] + HasNull { name: String, backtrace: Backtrace }, + + #[snafu(display("Unknown column {}", name))] + UnknownColumn { name: String, backtrace: Backtrace }, + + #[snafu(display( + "Length of column {} not equals to other columns, expect {}, given {}", + name, + expect, + given + ))] + LenNotEquals { + name: String, + expect: usize, + given: usize, + backtrace: Backtrace, + }, + + #[snafu(display( + "Request is too large, max is {}, current is {}", + MAX_BATCH_SIZE, + num_rows + ))] + RequestTooLarge { + num_rows: usize, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +/// Max number of updates of a write batch. +const MAX_BATCH_SIZE: usize = 1_000_000; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + StatusCode::InvalidArguments + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Implementation of [WriteRequest]. +pub struct WriteBatch { + schema: SchemaRef, + mutations: Vec, + num_rows: usize, +} + +impl WriteRequest for WriteBatch { + type Error = Error; + type PutOp = PutData; + + fn new(schema: SchemaRef) -> Self { + Self { + schema, + mutations: Vec::new(), + num_rows: 0, + } + } + + fn put(&mut self, data: PutData) -> Result<()> { + if data.is_empty() { + return Ok(()); + } + + self.validate_put(&data)?; + + self.add_num_rows(data.num_rows())?; + self.mutations.push(Mutation::Put(data)); + + Ok(()) + } +} + +// WriteBatch pub methods. +impl WriteBatch { + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub fn iter(&self) -> slice::Iter<'_, Mutation> { + self.mutations.iter() + } + + pub fn is_empty(&self) -> bool { + self.mutations.is_empty() + } +} + +pub enum Mutation { + Put(PutData), +} + +#[derive(Default)] +pub struct PutData { + columns: HashMap, +} + +impl PutOperation for PutData { + type Error = Error; + + fn new() -> PutData { + PutData::default() + } + + fn with_num_columns(num_columns: usize) -> PutData { + PutData { + columns: HashMap::with_capacity(num_columns), + } + } + + fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<()> { + self.add_column_by_name(name, vector) + } + + fn add_version_column(&mut self, vector: VectorRef) -> Result<()> { + // TODO(yingwen): Maybe ensure that version column must be a uint64 vector. + self.add_column_by_name(consts::VERSION_COLUMN_NAME, vector) + } + + fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<()> { + self.add_column_by_name(name, vector) + } +} + +// PutData pub methods. +impl PutData { + pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> { + self.columns.get(name) + } + + /// Returns number of rows in data. + pub fn num_rows(&self) -> usize { + self.columns + .values() + .next() + .map(|col| col.len()) + .unwrap_or(0) + } + + /// Returns true if no rows in data. + /// + /// `PutData` with empty column will also be considered as empty. + pub fn is_empty(&self) -> bool { + self.num_rows() == 0 + } +} + +impl WriteBatch { + fn validate_put(&self, data: &PutData) -> Result<()> { + for column_schema in self.schema.column_schemas() { + match data.column_by_name(&column_schema.name) { + Some(col) => { + ensure!( + col.data_type() == column_schema.data_type, + TypeMismatchSnafu { + name: &column_schema.name, + expect: column_schema.data_type.clone(), + given: col.data_type(), + } + ); + + ensure!( + column_schema.is_nullable || col.null_count() == 0, + HasNullSnafu { + name: &column_schema.name, + } + ); + } + None => { + ensure!( + column_schema.is_nullable, + MissingColumnSnafu { + name: &column_schema.name, + } + ); + } + } + } + + // Check all columns in data also exists in schema. + for name in data.columns.keys() { + ensure!( + self.schema.column_schema_by_name(name).is_some(), + UnknownColumnSnafu { name } + ); + } + + Ok(()) + } + + fn add_num_rows(&mut self, len: usize) -> Result<()> { + let num_rows = self.num_rows + len; + ensure!( + num_rows <= MAX_BATCH_SIZE, + RequestTooLargeSnafu { num_rows } + ); + self.num_rows = num_rows; + Ok(()) + } +} + +impl<'a> IntoIterator for &'a WriteBatch { + type Item = &'a Mutation; + type IntoIter = slice::Iter<'a, Mutation>; + + fn into_iter(self) -> slice::Iter<'a, Mutation> { + self.iter() + } +} + +impl PutData { + fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> { + ensure!( + !self.columns.contains_key(name), + DuplicateColumnSnafu { name } + ); + + if let Some(col) = self.columns.values().next() { + ensure!( + col.len() == vector.len(), + LenNotEqualsSnafu { + name, + expect: col.len(), + given: vector.len(), + } + ); + } + + self.columns.insert(name.to_string(), vector); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::iter; + use std::sync::Arc; + + use datatypes::type_id::LogicalTypeId; + use datatypes::vectors::{BooleanVector, Int32Vector, UInt64Vector}; + + use super::*; + use crate::test_util::write_batch_util; + + #[test] + fn test_put_data_basic() { + let mut put_data = PutData::new(); + assert!(put_data.is_empty()); + + let vector1 = Arc::new(Int32Vector::from_slice(&[1, 2, 3, 4, 5])); + let vector2 = Arc::new(UInt64Vector::from_slice(&[0, 2, 4, 6, 8])); + + put_data.add_key_column("k1", vector1.clone()).unwrap(); + put_data.add_version_column(vector2).unwrap(); + put_data.add_value_column("v1", vector1).unwrap(); + + assert_eq!(5, put_data.num_rows()); + assert!(!put_data.is_empty()); + + assert!(put_data.column_by_name("no such column").is_none()); + assert!(put_data.column_by_name("k1").is_some()); + assert!(put_data.column_by_name("v1").is_some()); + assert!(put_data + .column_by_name(consts::VERSION_COLUMN_NAME) + .is_some()); + } + + #[test] + fn test_put_data_empty_vector() { + let mut put_data = PutData::with_num_columns(1); + assert!(put_data.is_empty()); + + let vector1 = Arc::new(Int32Vector::from_slice(&[])); + put_data.add_key_column("k1", vector1).unwrap(); + + assert_eq!(0, put_data.num_rows()); + assert!(put_data.is_empty()); + } + + fn new_test_batch() -> WriteBatch { + write_batch_util::new_write_batch(&[ + ("k1", LogicalTypeId::UInt64, false), + (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), + ("v1", LogicalTypeId::Boolean, true), + ]) + } + + #[test] + fn test_write_batch_put() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); + let boolv = Arc::new(BooleanVector::from(vec![true, false, true])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + put_data.add_version_column(intv).unwrap(); + put_data.add_value_column("v1", boolv).unwrap(); + + let mut batch = new_test_batch(); + assert!(batch.is_empty()); + batch.put(put_data).unwrap(); + assert!(!batch.is_empty()); + + let mut iter = batch.iter(); + let Mutation::Put(put_data) = iter.next().unwrap(); + assert_eq!(3, put_data.num_rows()); + } + + fn check_err(err: Error, msg: &str) { + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + assert!(err.backtrace_opt().is_some()); + assert!(err.to_string().contains(msg)); + } + + #[test] + fn test_write_batch_too_large() { + let boolv = Arc::new(BooleanVector::from_iter( + iter::repeat(Some(true)).take(MAX_BATCH_SIZE + 1), + )); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", boolv).unwrap(); + + let mut batch = write_batch_util::new_write_batch(&[("k1", LogicalTypeId::Boolean, false)]); + let err = batch.put(put_data).err().unwrap(); + check_err(err, "Request is too large"); + } + + #[test] + fn test_put_data_duplicate() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + let err = put_data.add_key_column("k1", intv).err().unwrap(); + check_err(err, "Duplicate column k1"); + } + + #[test] + fn test_put_data_different_len() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); + let boolv = Arc::new(BooleanVector::from(vec![true, false])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv).unwrap(); + let err = put_data.add_value_column("v1", boolv).err().unwrap(); + check_err(err, "Length of column v1 not equals"); + } + + #[test] + fn test_put_type_mismatch() { + let boolv = Arc::new(BooleanVector::from(vec![true, false, true])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", boolv).unwrap(); + + let mut batch = new_test_batch(); + let err = batch.put(put_data).err().unwrap(); + check_err(err, "Type of column k1 does not match"); + } + + #[test] + fn test_put_type_has_null() { + let intv = Arc::new(UInt64Vector::from_iter(&[Some(1), None, Some(3)])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv).unwrap(); + + let mut batch = new_test_batch(); + let err = batch.put(put_data).err().unwrap(); + check_err(err, "Column k1 is not null"); + } + + #[test] + fn test_put_missing_column() { + let boolv = Arc::new(BooleanVector::from(vec![true, false, true])); + + let mut put_data = PutData::new(); + put_data.add_key_column("v1", boolv).unwrap(); + + let mut batch = new_test_batch(); + let err = batch.put(put_data).err().unwrap(); + check_err(err, "Missing column k1"); + } + + #[test] + fn test_put_unknown_column() { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); + let boolv = Arc::new(BooleanVector::from(vec![true, false, true])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + put_data.add_version_column(intv).unwrap(); + put_data.add_value_column("v1", boolv.clone()).unwrap(); + put_data.add_value_column("v2", boolv).unwrap(); + + let mut batch = new_test_batch(); + let err = batch.put(put_data).err().unwrap(); + check_err(err, "Unknown column v2"); + } +} diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index e4dd1ce826..2058ff1b62 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -13,4 +13,4 @@ futures = "0.3" [dev-dependencies] async-stream = "0.3" -tokio = { version = "1.18", features = ["full"] } \ No newline at end of file +tokio = { version = "1.0", features = ["full"] } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index dedc65006a..776894b76b 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -1,22 +1,27 @@ //! Storage APIs. -mod column_family; +pub mod consts; mod descriptors; mod engine; +mod metadata; mod region; mod requests; mod responses; mod snapshot; +mod types; pub use datatypes::data_type::ConcreteDataType; -pub use datatypes::schema::SchemaRef; +pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -pub use self::column_family::ColumnFamily; pub use self::descriptors::{ - ColumnDescriptor, ColumnFamilyDescriptor, KeyDescriptor, RegionDescriptor, + ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, + ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, RowKeyDescriptor, + RowKeyDescriptorBuilder, }; pub use self::engine::{EngineContext, StorageEngine}; +pub use self::metadata::RegionMeta; pub use self::region::{Region, WriteContext}; -pub use self::requests::{GetRequest, ScanRequest, WriteRequest}; +pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest}; pub use self::responses::{GetResponse, ScanResponse, WriteResponse}; pub use self::snapshot::{ReadContext, Snapshot}; +pub use self::types::{SequenceNumber, ValueType}; diff --git a/src/store-api/src/storage/column_family.rs b/src/store-api/src/storage/column_family.rs deleted file mode 100644 index f26a371b39..0000000000 --- a/src/store-api/src/storage/column_family.rs +++ /dev/null @@ -1,4 +0,0 @@ -/// A group of value columns. -pub trait ColumnFamily: Send + Sync + Clone { - fn name(&self) -> &str; -} diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs new file mode 100644 index 0000000000..2effce4718 --- /dev/null +++ b/src/store-api/src/storage/consts.rs @@ -0,0 +1,27 @@ +//! Constants. + +use crate::storage::descriptors::{ColumnFamilyId, ColumnId}; + +// Ids reserved for internal column families: + +/// Column family Id for row key columns. +/// +/// This is virtual column family, actually row key columns are not +/// stored in any column family. +pub const KEY_CF_ID: ColumnFamilyId = 0; +/// Id for default column family. +pub const DEFAULT_CF_ID: ColumnFamilyId = 1; + +// Ids reserved for internal columns: + +// TODO(yingwen): Reserve one bit for internal columns. +/// Column id for version column. +pub const VERSION_COLUMN_ID: ColumnId = 1; + +// Names reserved for internal columns: + +/// Name of version column. +pub const VERSION_COLUMN_NAME: &str = "__version"; + +// Names for default column family. +pub const DEFAULT_CF_NAME: &str = "default"; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index a91cc9d8c0..97d5b08b60 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -1,17 +1,38 @@ -use crate::storage::ConcreteDataType; +use datatypes::value::Value; -/// A [ColumnDescriptor] contains information about a column. -#[derive(Debug)] +use crate::storage::{consts, ColumnSchema, ConcreteDataType}; + +/// Id of column, unique in each region. +pub type ColumnId = u32; +/// Id of column family, unique in each region. +pub type ColumnFamilyId = u32; + +// TODO(yingwen): Validate default value has same type with column, and name is a valid column name. +/// A [ColumnDescriptor] contains information to create a column. +#[derive(Debug, Clone)] pub struct ColumnDescriptor { + pub id: ColumnId, pub name: String, pub data_type: ConcreteDataType, + /// Is column nullable, default is true. pub is_nullable: bool, + /// Default value of column, default is None, which means no default value + /// for this column, and user must provide value for a not-null column. + pub default_value: Option, + pub comment: String, } -/// A [KeyDescriptor] contains information about a row key. -#[derive(Debug)] -pub struct KeyDescriptor { +impl From<&ColumnDescriptor> for ColumnSchema { + fn from(desc: &ColumnDescriptor) -> ColumnSchema { + ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable) + } +} + +/// A [RowKeyDescriptor] contains information about row key. +#[derive(Debug, Clone)] +pub struct RowKeyDescriptor { pub columns: Vec, + /// Timestamp key column. pub timestamp: ColumnDescriptor, /// Enable version column in row key if this field is true. /// @@ -19,21 +40,250 @@ pub struct KeyDescriptor { pub enable_version_column: bool, } -/// A [ColumnFamilyDescriptor] contains information about a column family. -#[derive(Debug)] +/// A [ColumnFamilyDescriptor] contains information to create a column family. +#[derive(Debug, Clone)] pub struct ColumnFamilyDescriptor { + pub cf_id: ColumnFamilyId, pub name: String, /// Descriptors of columns in this column family. pub columns: Vec, } -/// A [RegionDescriptor] contains information about a region. -#[derive(Debug)] +/// A [RegionDescriptor] contains information to create a region. +#[derive(Debug, Clone)] pub struct RegionDescriptor { + /// Region name. + pub name: String, /// Row key descriptor of this region. - pub key: KeyDescriptor, + pub row_key: RowKeyDescriptor, /// Default column family. pub default_cf: ColumnFamilyDescriptor, /// Extra column families defined by user. pub extra_cfs: Vec, } + +pub struct ColumnDescriptorBuilder { + id: ColumnId, + name: String, + data_type: ConcreteDataType, + is_nullable: bool, + default_value: Option, + comment: String, +} + +impl ColumnDescriptorBuilder { + pub fn new>(id: ColumnId, name: T, data_type: ConcreteDataType) -> Self { + Self { + id, + name: name.into(), + data_type, + is_nullable: true, + default_value: None, + comment: "".to_string(), + } + } + + pub fn is_nullable(mut self, is_nullable: bool) -> Self { + self.is_nullable = is_nullable; + self + } + + pub fn default_value(mut self, value: Option) -> Self { + self.default_value = value; + self + } + + pub fn comment>(mut self, comment: T) -> Self { + self.comment = comment.into(); + self + } + + pub fn build(self) -> ColumnDescriptor { + ColumnDescriptor { + id: self.id, + name: self.name, + data_type: self.data_type, + is_nullable: self.is_nullable, + default_value: self.default_value, + comment: self.comment, + } + } +} + +pub struct RowKeyDescriptorBuilder { + columns: Vec, + timestamp: ColumnDescriptor, + enable_version_column: bool, +} + +impl RowKeyDescriptorBuilder { + pub fn new(timestamp: ColumnDescriptor) -> Self { + Self { + columns: Vec::new(), + timestamp, + enable_version_column: true, + } + } + + pub fn columns_capacity(mut self, capacity: usize) -> Self { + self.columns.reserve(capacity); + self + } + + pub fn push_column(mut self, column: ColumnDescriptor) -> Self { + self.columns.push(column); + self + } + + pub fn enable_version_column(mut self, enable: bool) -> Self { + self.enable_version_column = enable; + self + } + + pub fn build(self) -> RowKeyDescriptor { + RowKeyDescriptor { + columns: self.columns, + timestamp: self.timestamp, + enable_version_column: self.enable_version_column, + } + } +} + +pub struct ColumnFamilyDescriptorBuilder { + cf_id: ColumnFamilyId, + name: String, + columns: Vec, +} + +impl ColumnFamilyDescriptorBuilder { + pub fn new() -> Self { + Self { + cf_id: consts::DEFAULT_CF_ID, + name: consts::DEFAULT_CF_NAME.to_string(), + columns: Vec::new(), + } + } + + pub fn cf_id(mut self, cf_id: ColumnFamilyId) -> Self { + self.cf_id = cf_id; + self + } + + pub fn name>(mut self, name: T) -> Self { + self.name = name.into(); + self + } + + pub fn columns_capacity(mut self, capacity: usize) -> Self { + self.columns.reserve(capacity); + self + } + + pub fn push_column(mut self, column: ColumnDescriptor) -> Self { + self.columns.push(column); + self + } + + pub fn build(self) -> ColumnFamilyDescriptor { + ColumnFamilyDescriptor { + cf_id: self.cf_id, + name: self.name, + columns: self.columns, + } + } +} + +impl Default for ColumnFamilyDescriptorBuilder { + fn default() -> ColumnFamilyDescriptorBuilder { + ColumnFamilyDescriptorBuilder::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn new_column_desc_builder() -> ColumnDescriptorBuilder { + ColumnDescriptorBuilder::new(3, "test", ConcreteDataType::int32_datatype()) + } + + #[test] + fn test_column_descriptor_builder() { + let desc = new_column_desc_builder().build(); + assert_eq!(3, desc.id); + assert_eq!("test", desc.name); + assert_eq!(ConcreteDataType::int32_datatype(), desc.data_type); + assert!(desc.is_nullable); + assert!(desc.default_value.is_none()); + assert!(desc.comment.is_empty()); + + let desc = new_column_desc_builder().is_nullable(false).build(); + assert!(!desc.is_nullable); + + let desc = new_column_desc_builder() + .default_value(Some(Value::Null)) + .build(); + assert_eq!(Value::Null, desc.default_value.unwrap()); + + let desc = new_column_desc_builder() + .default_value(Some(Value::Int32(123))) + .build(); + assert_eq!(Value::Int32(123), desc.default_value.unwrap()); + + let desc = new_column_desc_builder().comment("A test column").build(); + assert_eq!("A test column", desc.comment); + } + + fn new_timestamp_desc() -> ColumnDescriptor { + ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype()).build() + } + + #[test] + fn test_row_key_descriptor_builder() { + let timestamp = new_timestamp_desc(); + + let desc = RowKeyDescriptorBuilder::new(timestamp.clone()).build(); + assert!(desc.columns.is_empty()); + assert!(desc.enable_version_column); + + let desc = RowKeyDescriptorBuilder::new(timestamp.clone()) + .columns_capacity(1) + .push_column( + ColumnDescriptorBuilder::new(6, "c1", ConcreteDataType::int32_datatype()).build(), + ) + .push_column( + ColumnDescriptorBuilder::new(7, "c2", ConcreteDataType::int32_datatype()).build(), + ) + .build(); + assert_eq!(2, desc.columns.len()); + assert!(desc.enable_version_column); + + let desc = RowKeyDescriptorBuilder::new(timestamp) + .enable_version_column(false) + .build(); + assert!(desc.columns.is_empty()); + assert!(!desc.enable_version_column); + } + + #[test] + fn test_cf_descriptor_builder() { + let desc = ColumnFamilyDescriptorBuilder::default().build(); + assert_eq!(consts::DEFAULT_CF_ID, desc.cf_id); + assert_eq!(consts::DEFAULT_CF_NAME, desc.name); + assert!(desc.columns.is_empty()); + + let desc = ColumnFamilyDescriptorBuilder::new() + .cf_id(32) + .name("cf1") + .build(); + assert_eq!(32, desc.cf_id); + assert_eq!("cf1", desc.name); + + let desc = ColumnFamilyDescriptorBuilder::new() + .push_column( + ColumnDescriptorBuilder::new(6, "c1", ConcreteDataType::int32_datatype()).build(), + ) + .build(); + assert_eq!(1, desc.columns.len()); + } +} diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 42e45813ff..a74dd39e42 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -4,33 +4,52 @@ //! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds //! chunks of rows, support operations like PUT/DELETE/SCAN. +use async_trait::async_trait; use common_error::ext::ErrorExt; use crate::storage::descriptors::RegionDescriptor; use crate::storage::region::Region; /// Storage engine provides primitive operations to store and access data. +#[async_trait] pub trait StorageEngine: Send + Sync + Clone { type Error: ErrorExt + Send + Sync; type Region: Region; - /// Open an existing region. - fn open_region(&self, ctx: &EngineContext, name: &str) -> Result; + /// Opens an existing region. + async fn open_region( + &self, + ctx: &EngineContext, + name: &str, + ) -> Result; - /// Close given region. - fn close_region(&self, ctx: &EngineContext, region: Self::Region) -> Result<(), Self::Error>; + /// Closes given region. + async fn close_region( + &self, + ctx: &EngineContext, + region: Self::Region, + ) -> Result<(), Self::Error>; - /// Create and return a new region. - fn create_region( + /// Creates and returns the created region. + /// + /// Returns exsiting region if region with same name already exists. The region will + /// be opened before returning. + async fn create_region( &self, ctx: &EngineContext, descriptor: RegionDescriptor, ) -> Result; - /// Drop given region. - fn drop_region(&self, ctx: &EngineContext, region: Self::Region) -> Result<(), Self::Error>; + /// Drops given region. + /// + /// The region will be closed before dropping. + async fn drop_region( + &self, + ctx: &EngineContext, + region: Self::Region, + ) -> Result<(), Self::Error>; - /// Return the opened region with given name. + /// Returns the opened region with given name. fn get_region( &self, ctx: &EngineContext, @@ -39,5 +58,5 @@ pub trait StorageEngine: Send + Sync + Clone { } /// Storage engine context. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct EngineContext {} diff --git a/src/store-api/src/storage/metadata.rs b/src/store-api/src/storage/metadata.rs new file mode 100644 index 0000000000..a2b434bf19 --- /dev/null +++ b/src/store-api/src/storage/metadata.rs @@ -0,0 +1,7 @@ +use crate::storage::SchemaRef; + +/// Metadata of a region. +pub trait RegionMeta: Send + Sync { + /// Returns the schema of the region. + fn schema(&self) -> &SchemaRef; +} diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 81e149d0ea..f6d251f177 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -1,5 +1,5 @@ //! Region holds chunks of rows stored in the storage engine, but does not require that -//! rows must have continuous primary key range, which is implementation sepecific. +//! rows must have continuous primary key range, which is implementation specific. //! //! Regions support operations like PUT/DELETE/SCAN that most key-value stores provide. //! However, unlike key-value store, data stored in region has data model like: @@ -9,37 +9,39 @@ //! ``` //! //! The data model require each row -//! - has 0 ~ m key column -//! - **MUST** has a timestamp column -//! - has a version column -//! - has 0 ~ n value column +//! - has 0 ~ m key column, parts of row key columns; +//! - **MUST** has a timestamp column, part of row key columns; +//! - has a version column, part of row key columns; +//! - has 0 ~ n value column. //! -//! Each row is identify by (value of key columns, timestamp, version), which forms +//! Each row is identified by (value of key columns, timestamp, version), which forms //! a row key. Note that the implementation may allow multiple rows have same row -//! key (like ClickHouse), which is useful is analytic scenario. +//! key (like ClickHouse), which is useful in analytic scenario. +use async_trait::async_trait; use common_error::ext::ErrorExt; -use crate::storage::column_family::ColumnFamily; +use crate::storage::metadata::RegionMeta; use crate::storage::requests::WriteRequest; use crate::storage::responses::WriteResponse; use crate::storage::snapshot::{ReadContext, Snapshot}; -use crate::storage::SchemaRef; /// Chunks of rows in storage engine. +#[async_trait] pub trait Region: Send + Sync + Clone { type Error: ErrorExt + Send + Sync; + type Meta: RegionMeta; type WriteRequest: WriteRequest; - type ColumnFamily: ColumnFamily; type Snapshot: Snapshot; - fn schema(&self) -> &SchemaRef; + /// Returns name of the region. + fn name(&self) -> &str; - /// List all column families. - fn list_cf(&self) -> Result, Self::Error>; + /// Returns the in memory metadata of this region. + fn in_memory_metadata(&self) -> Self::Meta; /// Write updates to region. - fn write( + async fn write( &self, ctx: &WriteContext, request: Self::WriteRequest, diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 264ff6192a..b83b2d57f4 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -1,8 +1,30 @@ -use crate::storage::column_family::ColumnFamily; +use common_error::ext::ErrorExt; +use datatypes::schema::SchemaRef; +use datatypes::vectors::VectorRef; /// Write request holds a collection of updates to apply to a region. pub trait WriteRequest: Send { - type ColumnFamily: ColumnFamily; + type Error: ErrorExt + Send + Sync; + type PutOp: PutOperation; + + fn new(schema: SchemaRef) -> Self; + + fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>; +} + +/// Put multiple rows. +pub trait PutOperation { + type Error: ErrorExt + Send + Sync; + + fn new() -> Self; + + fn with_num_columns(num_columns: usize) -> Self; + + fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; + + fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>; + + fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; } #[derive(Debug)] diff --git a/src/store-api/src/storage/snapshot.rs b/src/store-api/src/storage/snapshot.rs index 6913d8b96e..9527ac8635 100644 --- a/src/store-api/src/storage/snapshot.rs +++ b/src/store-api/src/storage/snapshot.rs @@ -1,23 +1,25 @@ +use async_trait::async_trait; use common_error::ext::ErrorExt; use datatypes::schema::SchemaRef; -use crate::storage::column_family::ColumnFamily; use crate::storage::requests::{GetRequest, ScanRequest}; use crate::storage::responses::{GetResponse, ScanResponse}; /// A consistent read-only view of region. +#[async_trait] pub trait Snapshot: Send + Sync { type Error: ErrorExt + Send + Sync; - type ColumnFamily: ColumnFamily; fn schema(&self) -> &SchemaRef; - fn scan(&self, ctx: &ReadContext, request: ScanRequest) -> Result; + async fn scan( + &self, + ctx: &ReadContext, + request: ScanRequest, + ) -> Result; - fn get(&self, ctx: &ReadContext, request: GetRequest) -> Result; - - /// List all column families. - fn list_cf(&self) -> Result, Self::Error>; + async fn get(&self, ctx: &ReadContext, request: GetRequest) + -> Result; } /// Context for read. diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs new file mode 100644 index 0000000000..e2679b1e6d --- /dev/null +++ b/src/store-api/src/storage/types.rs @@ -0,0 +1,12 @@ +//! Common types. + +/// Represents a sequence number of data in storage. The offset of logstore can be used +/// as a sequence number. +pub type SequenceNumber = u64; + +/// Operation type of the value to write to storage. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum ValueType { + /// Put operation. + Put, +} diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 1f20c6a0bb..6760db3f39 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -3,11 +3,6 @@ name = "table" version = "0.1.0" edition = "2021" -[dependencies.arrow] -package = "arrow2" -version="0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] - [dependencies] async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index e1328b106b..b15675cbff 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -6,7 +6,6 @@ use std::fmt::Debug; use std::mem; use std::sync::{Arc, Mutex}; -use arrow::error::{ArrowError, Result as ArrowResult}; use common_query::logical_plan::Expr; use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; @@ -25,8 +24,9 @@ use datafusion::physical_plan::{ SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; +use datatypes::arrow::error::{ArrowError, Result as ArrowResult}; use datatypes::schema::SchemaRef as TableSchemaRef; -use datatypes::schema::{Schema, SchemaRef}; +use datatypes::schema::SchemaRef; use futures::Stream; use snafu::prelude::*; @@ -109,6 +109,10 @@ impl DfTableProviderAdapter { pub fn new(table: TableRef) -> Self { Self { table } } + + pub fn table(&self) -> TableRef { + self.table.clone() + } } #[async_trait::async_trait] @@ -160,16 +164,18 @@ impl TableProvider for DfTableProviderAdapter { /// Datafusion TableProvider -> greptime Table pub struct TableAdapter { + schema: TableSchemaRef, table_provider: Arc, runtime: Arc, } impl TableAdapter { - pub fn new(table_provider: Arc, runtime: Arc) -> Self { - Self { + pub fn new(table_provider: Arc, runtime: Arc) -> Result { + Ok(Self { + schema: Arc::new(table_provider.schema().try_into().unwrap()), table_provider, runtime, - } + }) } } @@ -180,7 +186,7 @@ impl Table for TableAdapter { } fn schema(&self) -> TableSchemaRef { - Arc::new(self.table_provider.schema().into()) + self.schema.clone() } fn table_type(&self) -> TableType { @@ -211,7 +217,10 @@ impl Table for TableAdapter { .await .context(error::DatafusionSnafu)?; - Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + df_stream, + ))) } fn supports_filter_pushdown(&self, filter: &Expr) -> Result { @@ -268,18 +277,19 @@ impl Stream for DfRecordBatchStreamAdapter { /// Datafusion SendableRecordBatchStream to greptime RecordBatchStream pub struct RecordBatchStreamAdapter { + schema: SchemaRef, stream: DfSendableRecordBatchStream, } impl RecordBatchStreamAdapter { - pub fn new(stream: DfSendableRecordBatchStream) -> Self { - Self { stream } + pub fn new(schema: SchemaRef, stream: DfSendableRecordBatchStream) -> Self { + Self { schema, stream } } } impl RecordBatchStream for RecordBatchStreamAdapter { fn schema(&self) -> SchemaRef { - Arc::new(Schema::new(self.stream.schema())) + self.schema.clone() } } diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 62a5857852..cb47404f7c 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -2,13 +2,12 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; -use arrow::array::UInt32Array; -use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; -use datafusion::field_util::SchemaExt; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; -use datatypes::schema::{Schema, SchemaRef}; +use datatypes::arrow::array::UInt32Array; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use futures::task::{Context, Poll}; use futures::Stream; @@ -16,20 +15,20 @@ use crate::error::Result; use crate::table::{Expr, Table}; /// numbers table for test -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone)] pub struct NumbersTable { schema: SchemaRef, } impl Default for NumbersTable { fn default() -> Self { - let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + let column_schemas = vec![ColumnSchema::new( "number", - DataType::UInt32, + ConcreteDataType::uint32_datatype(), false, - )])); + )]; Self { - schema: Arc::new(Schema::new(arrow_schema)), + schema: Arc::new(Schema::new(column_schemas)), } } }