diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index b98d7a31c0..9f5f880fdd 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -81,12 +81,6 @@ pub enum Error { source: common_recordbatch::error::Error, }, - #[snafu(display("Failed to build table schema for system catalog table"))] - SystemCatalogSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display( "Failed to insert table creation record to system catalog, source: {}", source @@ -116,7 +110,7 @@ impl ErrorExt for Error { | Error::OpenTable { .. } | Error::ReadSystemCatalog { .. } | Error::InsertTableRecord { .. } => StatusCode::StorageUnavailable, - Error::RegisterTable { .. } | Error::SystemCatalogSchema { .. } => StatusCode::Internal, + Error::RegisterTable { .. } => StatusCode::Internal, Error::TableExists { .. } => StatusCode::TableAlreadyExists, } } diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index fb2a1eb677..25ad3a3eaa 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -7,7 +7,7 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector}; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; use datatypes::vectors::{BinaryVector, Int64Vector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -22,7 +22,7 @@ use crate::consts::{ }; use crate::error::{ CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, - OpenSystemCatalogSnafu, Result, SystemCatalogSchemaSnafu, ValueDeserializeSnafu, + OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu, }; pub const ENTRY_TYPE_INDEX: usize = 0; @@ -68,7 +68,7 @@ impl SystemCatalogTable { table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, }; - let schema = Arc::new(build_system_catalog_schema()?); + let schema = Arc::new(build_system_catalog_schema()); let ctx = EngineContext::default(); if let Some(table) = engine @@ -107,12 +107,14 @@ impl SystemCatalogTable { } /// Build system catalog table schema. -/// A system catalog table consists of 4 columns, namely +/// A system catalog table consists of 6 columns, namely /// - entry_type: type of entry in current row, can be any variant of [EntryType]. /// - key: a binary encoded key of entry, differs according to different entry type. /// - timestamp: currently not used. /// - value: JSON-encoded value of entry's metadata. -fn build_system_catalog_schema() -> Result { +/// - gmt_created: create time of this metadata. +/// - gmt_modified: last updated time of this metadata. +fn build_system_catalog_schema() -> Schema { let cols = vec![ ColumnSchema::new( "entry_type".to_string(), @@ -146,7 +148,11 @@ fn build_system_catalog_schema() -> Result { ), ]; - Schema::with_timestamp_index(cols, TIMESTAMP_INDEX).context(SystemCatalogSchemaSnafu) + // The schema of this table must be valid. + SchemaBuilder::from(cols) + .timestamp_index(2) + .build() + .unwrap() } pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 1287011a71..0a38ea501a 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -193,7 +193,7 @@ mod tests { use common_recordbatch::SendableRecordBatchStream; use datatypes::{ data_type::ConcreteDataType, - schema::{ColumnSchema, Schema, SchemaRef}, + schema::{ColumnSchema, SchemaBuilder, SchemaRef}, value::Value, }; use table::error::Result as TableResult; @@ -280,7 +280,12 @@ mod tests { ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ]; - Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap()) + Arc::new( + SchemaBuilder::from(column_schemas) + .timestamp_index(3) + .build() + .unwrap(), + ) } async fn scan( &self, diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index e42252c4b8..248c36a5db 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -64,7 +64,7 @@ mod tests { use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; + use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use datatypes::value::Value; use log_store::fs::noop::NoopLogStore; use object_store::{backend::fs::Backend, ObjectStore}; @@ -96,7 +96,12 @@ mod tests { ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ]; - Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap()) + Arc::new( + SchemaBuilder::from(column_schemas) + .timestamp_index(3) + .build() + .unwrap(), + ) } async fn scan( &self, diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index f0f39db0cd..b91429a582 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use catalog::RegisterTableRequest; use common_telemetry::tracing::info; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName, TableConstraint}; @@ -134,7 +134,10 @@ impl SqlHandler { .collect::>>()?; let schema = Arc::new( - Schema::with_timestamp_index(columns_schemas, ts_index).context(CreateSchemaSnafu)?, + SchemaBuilder::from(columns_schemas) + .timestamp_index(ts_index) + .build() + .context(CreateSchemaSnafu)?, ); let request = CreateTableRequest { diff --git a/src/datanode/tests/http_test.rs b/src/datanode/tests/http_test.rs index 7a44b687f9..2720bdff66 100644 --- a/src/datanode/tests/http_test.rs +++ b/src/datanode/tests/http_test.rs @@ -40,7 +40,7 @@ async fn test_sql_api() { let body = res.text().await; assert_eq!( body, - r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# ); } diff --git a/src/datanode/tests/test_util.rs b/src/datanode/tests/test_util.rs index 8a86b3b40a..38d9377dc1 100644 --- a/src/datanode/tests/test_util.rs +++ b/src/datanode/tests/test_util.rs @@ -5,7 +5,7 @@ use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::Instance; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; use snafu::ResultExt; use table::engine::EngineContext; use table::engine::TableEngineRef; @@ -62,7 +62,9 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { table_name: table_name.to_string(), desc: Some(" a test table".to_string()), schema: Arc::new( - Schema::with_timestamp_index(column_schemas, 3) + SchemaBuilder::from(column_schemas) + .timestamp_index(3) + .build() .expect("ts is expected to be timestamp column"), ), create_if_not_exists: true, diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index be367f5bd1..3a358b43af 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -42,6 +42,17 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to parse version in schema meta, value: {}, source: {}", + value, + source + ))] + ParseSchemaVersion { + value: String, + source: std::num::ParseIntError, + backtrace: Backtrace, + }, + #[snafu(display("Invalid timestamp index: {}", index))] InvalidTimestampIndex { index: usize, backtrace: Backtrace }, } diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 91c76247d2..19f0fdfc18 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -15,3 +15,4 @@ pub mod value; pub mod vectors; pub use arrow; +pub use error::{Error, Result}; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 0b0ec7c211..1f5729464a 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,7 +1,8 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; -use arrow::datatypes::{Field, Metadata, Schema as ArrowSchema}; +pub use arrow::datatypes::Metadata; +use arrow::datatypes::{Field, Schema as ArrowSchema}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -9,9 +10,7 @@ use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error, Result}; const TIMESTAMP_INDEX_KEY: &str = "greptime:timestamp_index"; - -// TODO(yingwen): consider assign a version to schema so compare schema can be -// done by compare version. +const VERSION_KEY: &str = "greptime:version"; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ColumnSchema { @@ -34,6 +33,7 @@ impl ColumnSchema { } } +/// A common schema, should be immutable. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Schema { column_schemas: Vec, @@ -44,43 +44,27 @@ pub struct Schema { /// Timestamp key column is the column holds the timestamp and forms part of /// the primary key. None means there is no timestamp key column. timestamp_index: Option, + /// Version of the schema. + /// + /// Initial value is zero. The version should bump after altering schema. + version: u32, } impl Schema { + /// Initial version of the schema. + pub const INITIAL_VERSION: u32 = 0; + pub fn new(column_schemas: Vec) -> Schema { - let (arrow_schema, name_to_index) = collect_column_schemas(&column_schemas); - - Schema { - column_schemas, - name_to_index, - arrow_schema: Arc::new(arrow_schema), - timestamp_index: None, - } - } - - pub fn with_timestamp_index( - column_schemas: Vec, - timestamp_index: usize, - ) -> Result { - let (arrow_schema, name_to_index) = collect_column_schemas(&column_schemas); - let mut metadata = BTreeMap::new(); - metadata.insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string()); - let arrow_schema = Arc::new(arrow_schema.with_metadata(metadata)); - - validate_timestamp_index(&column_schemas, timestamp_index)?; - - Ok(Schema { - column_schemas, - name_to_index, - arrow_schema, - timestamp_index: Some(timestamp_index), - }) + // Builder won't fail + SchemaBuilder::from(column_schemas).build().unwrap() } + #[inline] pub fn arrow_schema(&self) -> &Arc { &self.arrow_schema } + #[inline] pub fn column_schemas(&self) -> &[ColumnSchema] { &self.column_schemas } @@ -106,11 +90,89 @@ impl Schema { pub fn timestamp_column(&self) -> Option<&ColumnSchema> { self.timestamp_index.map(|idx| &self.column_schemas[idx]) } + + #[inline] + pub fn version(&self) -> u32 { + self.version + } + + #[inline] + pub fn metadata(&self) -> &Metadata { + &self.arrow_schema.metadata + } } -fn collect_column_schemas( - column_schemas: &[ColumnSchema], -) -> (ArrowSchema, HashMap) { +#[derive(Default)] +pub struct SchemaBuilder { + column_schemas: Vec, + name_to_index: HashMap, + fields: Vec, + timestamp_index: Option, + version: u32, + metadata: Metadata, +} + +impl From> for SchemaBuilder { + fn from(column_schemas: Vec) -> SchemaBuilder { + SchemaBuilder::from_columns(column_schemas) + } +} + +impl SchemaBuilder { + pub fn from_columns(column_schemas: Vec) -> Self { + let (fields, name_to_index) = collect_fields(&column_schemas); + + Self { + column_schemas, + name_to_index, + fields, + ..Default::default() + } + } + + /// Set timestamp index. + /// + /// The validation of timestamp column is done in `build()`. + pub fn timestamp_index(mut self, timestamp_index: usize) -> Self { + self.timestamp_index = Some(timestamp_index); + self + } + + pub fn version(mut self, version: u32) -> Self { + self.version = version; + self + } + + /// Add key value pair to metadata. + /// + /// Old metadata with same key would be overwritten. + pub fn add_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } + + pub fn build(mut self) -> Result { + if let Some(timestamp_index) = self.timestamp_index { + validate_timestamp_index(&self.column_schemas, timestamp_index)?; + self.metadata + .insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string()); + } + self.metadata + .insert(VERSION_KEY.to_string(), self.version.to_string()); + + let arrow_schema = ArrowSchema::from(self.fields).with_metadata(self.metadata); + + Ok(Schema { + column_schemas: self.column_schemas, + name_to_index: self.name_to_index, + arrow_schema: Arc::new(arrow_schema), + timestamp_index: self.timestamp_index, + version: self.version, + }) + } +} + +fn collect_fields(column_schemas: &[ColumnSchema]) -> (Vec, HashMap) { let mut fields = Vec::with_capacity(column_schemas.len()); let mut name_to_index = HashMap::with_capacity(column_schemas.len()); for (index, column_schema) in column_schemas.iter().enumerate() { @@ -119,7 +181,7 @@ fn collect_column_schemas( name_to_index.insert(column_schema.name.clone(), index); } - (ArrowSchema::from(fields), name_to_index) + (fields, name_to_index) } fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> { @@ -183,16 +245,28 @@ impl TryFrom> for Schema { if let Some(index) = timestamp_index { validate_timestamp_index(&column_schemas, index)?; } + let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?; Ok(Self { column_schemas, name_to_index, arrow_schema, timestamp_index, + version, }) } } +impl TryFrom for Schema { + type Error = Error; + + fn try_from(arrow_schema: ArrowSchema) -> Result { + let arrow_schema = Arc::new(arrow_schema); + + Schema::try_from(arrow_schema) + } +} + fn try_parse_index(metadata: &Metadata, key: &str) -> Result> { if let Some(value) = metadata.get(key) { let index = value @@ -205,6 +279,18 @@ fn try_parse_index(metadata: &Metadata, key: &str) -> Result> { } } +fn try_parse_version(metadata: &Metadata, key: &str) -> Result { + if let Some(value) = metadata.get(key) { + let version = value + .parse() + .context(error::ParseSchemaVersionSnafu { value })?; + + Ok(version) + } else { + Ok(Schema::INITIAL_VERSION) + } +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; @@ -223,6 +309,14 @@ mod tests { assert_eq!(column_schema, new_column_schema); } + #[test] + fn test_build_empty_schema() { + let schema = SchemaBuilder::default().build().unwrap(); + assert_eq!(0, schema.num_columns()); + + assert!(SchemaBuilder::default().timestamp_index(0).build().is_err()); + } + #[test] fn test_schema_no_timestamp() { let column_schemas = vec![ @@ -234,6 +328,7 @@ mod tests { assert_eq!(2, schema.num_columns()); assert!(schema.timestamp_index().is_none()); assert!(schema.timestamp_column().is_none()); + assert_eq!(Schema::INITIAL_VERSION, schema.version()); for column_schema in &column_schemas { let found = schema.column_schema_by_name(&column_schema.name).unwrap(); @@ -241,15 +336,25 @@ mod tests { } assert!(schema.column_schema_by_name("col3").is_none()); - let fields: Vec<_> = column_schemas.iter().map(Field::from).collect(); - let arrow_schema = Arc::new(ArrowSchema::from(fields)); - - let new_schema = Schema::try_from(arrow_schema.clone()).unwrap(); + let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap(); assert_eq!(schema, new_schema); assert_eq!(column_schemas, schema.column_schemas()); - assert_eq!(arrow_schema, *schema.arrow_schema()); - assert_eq!(arrow_schema, *new_schema.arrow_schema()); + } + + #[test] + fn test_metadata() { + let column_schemas = vec![ColumnSchema::new( + "col1", + ConcreteDataType::int32_datatype(), + false, + )]; + let schema = SchemaBuilder::from(column_schemas) + .add_metadata("k1", "v1") + .build() + .unwrap(); + + assert_eq!("v1", schema.metadata().get("k1").unwrap()); } #[test] @@ -258,10 +363,15 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), ]; - let schema = Schema::with_timestamp_index(column_schemas.clone(), 1).unwrap(); + let schema = SchemaBuilder::from(column_schemas.clone()) + .timestamp_index(1) + .version(123) + .build() + .unwrap(); assert_eq!(1, schema.timestamp_index().unwrap()); assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap()); + assert_eq!(123, schema.version()); let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap(); assert_eq!(1, schema.timestamp_index().unwrap()); @@ -274,8 +384,17 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false), ]; - assert!(Schema::with_timestamp_index(column_schemas.clone(), 0).is_err()); - assert!(Schema::with_timestamp_index(column_schemas.clone(), 1).is_err()); - assert!(Schema::with_timestamp_index(column_schemas, 2).is_err()); + assert!(SchemaBuilder::from(column_schemas.clone()) + .timestamp_index(0) + .build() + .is_err()); + assert!(SchemaBuilder::from(column_schemas.clone()) + .timestamp_index(1) + .build() + .is_err()); + assert!(SchemaBuilder::from(column_schemas) + .timestamp_index(1) + .build() + .is_err()); } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 319f98ed24..d40e6cd72b 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -3,7 +3,7 @@ use std::cmp::Ordering; use common_base::bytes::{Bytes, StringBytes}; use datafusion_common::ScalarValue; pub use ordered_float::OrderedFloat; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use crate::prelude::*; @@ -15,7 +15,7 @@ pub type OrderedF64 = OrderedFloat; /// Although compare Value with different data type is allowed, it is recommended to only /// compare Value with same data type. Comparing Value with different data type may not /// behaves as what you expect. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub enum Value { Null, @@ -132,30 +132,31 @@ impl From<&[u8]> for Value { } } -impl Serialize for Value { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self { - Value::Null => serde_json::Value::Null.serialize(serializer), - Value::Boolean(v) => v.serialize(serializer), - Value::UInt8(v) => v.serialize(serializer), - Value::UInt16(v) => v.serialize(serializer), - Value::UInt32(v) => v.serialize(serializer), - Value::UInt64(v) => v.serialize(serializer), - Value::Int8(v) => v.serialize(serializer), - Value::Int16(v) => v.serialize(serializer), - Value::Int32(v) => v.serialize(serializer), - Value::Int64(v) => v.serialize(serializer), - Value::Float32(v) => v.serialize(serializer), - Value::Float64(v) => v.serialize(serializer), - Value::String(bytes) => bytes.serialize(serializer), - Value::Binary(bytes) => bytes.serialize(serializer), - Value::Date(v) => v.serialize(serializer), - Value::DateTime(v) => v.serialize(serializer), - Value::List(_) => unimplemented!(), - } +impl TryFrom for serde_json::Value { + type Error = serde_json::Error; + + fn try_from(value: Value) -> serde_json::Result { + let json_value = match value { + Value::Null => serde_json::Value::Null, + Value::Boolean(v) => serde_json::Value::Bool(v), + Value::UInt8(v) => serde_json::Value::from(v), + Value::UInt16(v) => serde_json::Value::from(v), + Value::UInt32(v) => serde_json::Value::from(v), + Value::UInt64(v) => serde_json::Value::from(v), + Value::Int8(v) => serde_json::Value::from(v), + Value::Int16(v) => serde_json::Value::from(v), + Value::Int32(v) => serde_json::Value::from(v), + Value::Int64(v) => serde_json::Value::from(v), + Value::Float32(v) => serde_json::Value::from(v.0), + Value::Float64(v) => serde_json::Value::from(v.0), + Value::String(bytes) => serde_json::Value::String(bytes.as_utf8().to_string()), + Value::Binary(bytes) => serde_json::to_value(bytes)?, + Value::Date(v) => serde_json::Value::Number(v.into()), + Value::DateTime(v) => serde_json::Value::Number(v.into()), + Value::List(v) => serde_json::to_value(v)?, + }; + + Ok(json_value) } } @@ -437,4 +438,80 @@ mod tests { assert_eq!(ScalarValue::Boolean(None), Value::Null.into()); } + + fn to_json(value: Value) -> serde_json::Value { + value.try_into().unwrap() + } + + #[test] + fn test_to_json_value() { + assert_eq!(serde_json::Value::Null, to_json(Value::Null)); + assert_eq!(serde_json::Value::Bool(true), to_json(Value::Boolean(true))); + assert_eq!( + serde_json::Value::Number(20u8.into()), + to_json(Value::UInt8(20)) + ); + assert_eq!( + serde_json::Value::Number(20i8.into()), + to_json(Value::Int8(20)) + ); + assert_eq!( + serde_json::Value::Number(2000u16.into()), + to_json(Value::UInt16(2000)) + ); + assert_eq!( + serde_json::Value::Number(2000i16.into()), + to_json(Value::Int16(2000)) + ); + assert_eq!( + serde_json::Value::Number(3000u32.into()), + to_json(Value::UInt32(3000)) + ); + assert_eq!( + serde_json::Value::Number(3000i32.into()), + to_json(Value::Int32(3000)) + ); + assert_eq!( + serde_json::Value::Number(4000u64.into()), + to_json(Value::UInt64(4000)) + ); + assert_eq!( + serde_json::Value::Number(4000i64.into()), + to_json(Value::Int64(4000)) + ); + assert_eq!( + serde_json::Value::from(125.0f32), + to_json(Value::Float32(125.0.into())) + ); + assert_eq!( + serde_json::Value::from(125.0f64), + to_json(Value::Float64(125.0.into())) + ); + assert_eq!( + serde_json::Value::String(String::from("hello")), + to_json(Value::String(StringBytes::from("hello"))) + ); + assert_eq!( + serde_json::Value::from(b"world".as_slice()), + to_json(Value::Binary(Bytes::from(b"world".as_slice()))) + ); + assert_eq!( + serde_json::Value::Number(5000i32.into()), + to_json(Value::Date(5000)) + ); + assert_eq!( + serde_json::Value::Number(5000i64.into()), + to_json(Value::DateTime(5000)) + ); + + let json_value: serde_json::Value = + serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap(); + assert_eq!( + json_value, + to_json(Value::List(ListValue { + items: Some(Box::new(vec![Value::Int32(123)])), + datatype: ConcreteDataType::int32_datatype(), + })) + ); + } } diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs index 62e0dcafb8..5841fb6213 100644 --- a/src/datatypes/src/vectors/constant.rs +++ b/src/datatypes/src/vectors/constant.rs @@ -114,7 +114,7 @@ impl Serializable for ConstantVector { fn serialize_to_json(&self) -> Result> { std::iter::repeat(self.try_get(0)?) .take(self.len()) - .map(serde_json::to_value) + .map(serde_json::Value::try_from) .collect::>() .context(SerializeSnafu) } diff --git a/src/storage/benches/memtable/util/mod.rs b/src/storage/benches/memtable/util/mod.rs index a5cdba93f1..80b7a4c481 100644 --- a/src/storage/benches/memtable/util/mod.rs +++ b/src/storage/benches/memtable/util/mod.rs @@ -4,21 +4,23 @@ pub mod schema_util; use datatypes::type_id::LogicalTypeId; use storage::{ - memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef, MemtableSchema}, + memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef}, metadata::RegionMetadata, + schema::RegionSchemaRef, }; use crate::memtable::util::regiondesc_util::RegionDescBuilder; pub const TIMESTAMP_NAME: &str = "timestamp"; -pub fn schema_for_test() -> MemtableSchema { +pub fn schema_for_test() -> RegionSchemaRef { let desc = RegionDescBuilder::new("bench") .push_value_column(("v1", LogicalTypeId::UInt64, true)) .push_value_column(("v2", LogicalTypeId::String, true)) .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - MemtableSchema::new(metadata.columns_row_key) + + metadata.schema().clone() } pub fn new_memtable() -> MemtableRef { diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index a445053574..772e2a4cc0 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -175,14 +175,10 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Parquet file schema is not valid: {}", msg))] - SequenceColumnNotFound { msg: String, backtrace: Backtrace }, - - #[snafu(display("Parquet file schema is not valid, msg: {}, source: {}", msg, source))] + #[snafu(display("Parquet file schema is invalid, source: {}", source))] InvalidParquetSchema { - msg: String, #[snafu(backtrace)] - source: datatypes::error::Error, + source: crate::schema::Error, }, #[snafu(display("Region is under {} state, cannot proceed operation", state))] @@ -221,6 +217,20 @@ pub enum Error { given: SequenceNumber, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert sst schema, file: {}, source: {}", file, source))] + ConvertSstSchema { + file: String, + #[snafu(backtrace)] + source: crate::schema::Error, + }, + + #[snafu(display("Invalid raw region metadata, region: {}, source: {}", region, source))] + InvalidRawRegion { + region: String, + #[snafu(backtrace)] + source: MetadataError, + }, } pub type Result = std::result::Result; @@ -245,10 +255,11 @@ impl ErrorExt for Error { | DecodeMetaActionList { .. } | Readline { .. } | InvalidParquetSchema { .. } - | SequenceColumnNotFound { .. } | WalDataCorrupted { .. } | VersionNotFound { .. } - | SequenceNotMonotonic { .. } => StatusCode::Unexpected, + | SequenceNotMonotonic { .. } + | ConvertSstSchema { .. } + | InvalidRawRegion { .. } => StatusCode::Unexpected, FlushIo { .. } | WriteParquet { .. } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index cc21e85f2e..059f21ce66 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -199,7 +199,7 @@ impl FlushJob { async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result { let edit = RegionEdit { - region_version: self.shared.version_control.metadata().version, + region_version: self.shared.version_control.metadata().version(), flushed_sequence: self.flush_sequence, files_to_add: file_metas.to_vec(), files_to_remove: Vec::default(), diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 8b31f52122..627eb37c17 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -14,6 +14,7 @@ pub mod metadata; mod proto; mod read; mod region; +pub mod schema; mod snapshot; mod sst; mod sync; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 6b8fd3de69..beb8479802 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -14,12 +14,38 @@ use crate::error::{ ReadlineSnafu, Result, }; use crate::manifest::helper; -use crate::metadata::{RegionMetadataRef, VersionNumber}; +use crate::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber}; use crate::sst::FileMeta; +/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawRegionMetadata { + pub id: RegionId, + pub name: String, + pub columns: RawColumnsMetadata, + pub column_families: RawColumnFamiliesMetadata, + pub version: VersionNumber, +} + +/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawColumnsMetadata { + pub columns: Vec, + pub row_key_end: usize, + pub timestamp_key_index: usize, + pub enable_version_column: bool, + pub user_column_end: usize, +} + +/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawColumnFamiliesMetadata { + pub column_families: Vec, +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionChange { - pub metadata: RegionMetadataRef, + pub metadata: RawRegionMetadata, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -35,12 +61,6 @@ pub struct RegionEdit { pub files_to_remove: Vec, } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct RegionMetaActionList { - pub actions: Vec, - pub prev_version: ManifestVersion, -} - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum RegionMetaAction { Protocol(ProtocolAction), @@ -49,6 +69,12 @@ pub enum RegionMetaAction { Edit(RegionEdit), } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct RegionMetaActionList { + pub actions: Vec, + pub prev_version: ManifestVersion, +} + impl RegionMetaActionList { pub fn with_action(action: RegionMetaAction) -> Self { Self { diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 92aea2980e..633262977f 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -14,6 +14,7 @@ mod tests { use super::*; use crate::manifest::test_utils::*; + use crate::metadata::RegionMetadata; #[tokio::test] async fn test_region_manifest() { @@ -43,7 +44,7 @@ mod tests { manifest .update(RegionMetaActionList::with_action(RegionMetaAction::Change( RegionChange { - metadata: region_meta.clone(), + metadata: (&*region_meta).into(), }, ))) .await @@ -58,7 +59,10 @@ mod tests { match action { RegionMetaAction::Change(c) => { - assert_eq!(c.metadata, region_meta); + assert_eq!( + RegionMetadata::try_from(c.metadata.clone()).unwrap(), + *region_meta + ); } _ => unreachable!(), } @@ -79,7 +83,10 @@ mod tests { let action = &action_list.actions[0]; match action { RegionMetaAction::Change(c) => { - assert_eq!(c.metadata, region_meta); + assert_eq!( + RegionMetadata::try_from(c.metadata.clone()).unwrap(), + *region_meta + ); } _ => unreachable!(), } diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index d8aac2523e..0c3adaf9db 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -1,6 +1,5 @@ mod btree; mod inserter; -mod schema; #[cfg(test)] pub mod tests; mod version; @@ -13,9 +12,9 @@ use store_api::storage::{consts, SequenceNumber, ValueType}; use crate::error::Result; use crate::memtable::btree::BTreeMemtable; pub use crate::memtable::inserter::Inserter; -pub use crate::memtable::schema::MemtableSchema; pub use crate::memtable::version::{MemtableSet, MemtableVersion}; use crate::read::Batch; +use crate::schema::RegionSchemaRef; /// Unique id for memtables under same region. pub type MemtableId = u32; @@ -24,7 +23,7 @@ pub type MemtableId = u32; pub trait Memtable: Send + Sync + std::fmt::Debug { fn id(&self) -> MemtableId; - fn schema(&self) -> &MemtableSchema; + fn schema(&self) -> RegionSchemaRef; /// Write key/values to the memtable. /// @@ -83,7 +82,7 @@ pub enum RowOrdering { /// as an async trait. pub trait BatchIterator: Iterator> + Send + Sync { /// Returns the schema of this iterator. - fn schema(&self) -> &MemtableSchema; + fn schema(&self) -> RegionSchemaRef; /// Returns the ordering of the output rows from this iterator. fn ordering(&self) -> RowOrdering; @@ -92,7 +91,7 @@ pub trait BatchIterator: Iterator> + Send + Sync { pub type BoxedBatchIterator = Box; pub trait MemtableBuilder: Send + Sync + std::fmt::Debug { - fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef; + fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef; } pub type MemtableBuilderRef = Arc; @@ -136,7 +135,7 @@ impl KeyValues { pub struct DefaultMemtableBuilder; impl MemtableBuilder for DefaultMemtableBuilder { - fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef { + fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef { Arc::new(BTreeMemtable::new(id, schema)) } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 24911045c6..08ed6c7908 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -15,10 +15,10 @@ use store_api::storage::{SequenceNumber, ValueType}; use crate::error::Result; use crate::memtable::{ - BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, - MemtableSchema, RowOrdering, + BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering, }; use crate::read::Batch; +use crate::schema::RegionSchemaRef; type RwLockMap = RwLock>; @@ -28,13 +28,13 @@ type RwLockMap = RwLock>; #[derive(Debug)] pub struct BTreeMemtable { id: MemtableId, - schema: MemtableSchema, + schema: RegionSchemaRef, map: Arc, estimated_bytes: AtomicUsize, } impl BTreeMemtable { - pub fn new(id: MemtableId, schema: MemtableSchema) -> BTreeMemtable { + pub fn new(id: MemtableId, schema: RegionSchemaRef) -> BTreeMemtable { BTreeMemtable { id, schema, @@ -49,8 +49,8 @@ impl Memtable for BTreeMemtable { self.id } - fn schema(&self) -> &MemtableSchema { - &self.schema + fn schema(&self) -> RegionSchemaRef { + self.schema.clone() } fn write(&self, kvs: &KeyValues) -> Result<()> { @@ -81,14 +81,14 @@ impl Memtable for BTreeMemtable { struct BTreeIterator { ctx: IterContext, - schema: MemtableSchema, + schema: RegionSchemaRef, map: Arc, last_key: Option, } impl BatchIterator for BTreeIterator { - fn schema(&self) -> &MemtableSchema { - &self.schema + fn schema(&self) -> RegionSchemaRef { + self.schema.clone() } fn ordering(&self) -> RowOrdering { @@ -105,7 +105,7 @@ impl Iterator for BTreeIterator { } impl BTreeIterator { - fn new(ctx: IterContext, schema: MemtableSchema, map: Arc) -> BTreeIterator { + fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc) -> BTreeIterator { BTreeIterator { ctx, schema, diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index a54680615d..85a4fea6a4 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -293,10 +293,9 @@ mod tests { use store_api::storage::{PutOperation, WriteRequest}; use super::*; - use crate::memtable::{ - DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId, MemtableSchema, - }; + use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId}; use crate::metadata::RegionMetadata; + use crate::schema::RegionSchemaRef; use crate::test_util::descriptor_util::RegionDescBuilder; use crate::test_util::write_batch_util; @@ -558,7 +557,7 @@ mod tests { ) } - fn new_memtable_schema() -> MemtableSchema { + fn new_region_schema() -> RegionSchemaRef { let desc = RegionDescBuilder::new("test") .timestamp(("ts", LogicalTypeId::Int64, false)) .push_value_column(("value", LogicalTypeId::Int64, true)) @@ -566,7 +565,7 @@ mod tests { .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - MemtableSchema::new(metadata.columns_row_key) + metadata.schema().clone() } fn put_batch(batch: &mut WriteBatch, data: &[(i64, Option)]) { @@ -579,7 +578,7 @@ mod tests { batch.put(put_data).unwrap(); } - fn new_memtable_set(time_ranges: &[RangeMillis], schema: &MemtableSchema) -> MemtableSet { + fn new_memtable_set(time_ranges: &[RangeMillis], schema: &RegionSchemaRef) -> MemtableSet { let mut set = MemtableSet::new(); for (id, range) in time_ranges.iter().enumerate() { let mem = DefaultMemtableBuilder {}.build(id as MemtableId, schema.clone()); @@ -619,7 +618,7 @@ mod tests { let sequence = 11111; let bucket_duration = 100; let time_ranges = new_time_ranges(&[0], bucket_duration); - let memtable_schema = new_memtable_schema(); + let memtable_schema = new_region_schema(); let memtables = new_memtable_set(&time_ranges, &memtable_schema); let mut inserter = Inserter::new( sequence, @@ -657,7 +656,7 @@ mod tests { let sequence = 11111; let bucket_duration = 100; let time_ranges = new_time_ranges(&[0, 100, 200], bucket_duration); - let memtable_schema = new_memtable_schema(); + let memtable_schema = new_region_schema(); let memtables = new_memtable_set(&time_ranges, &memtable_schema); let mut inserter = Inserter::new( sequence, diff --git a/src/storage/src/memtable/schema.rs b/src/storage/src/memtable/schema.rs deleted file mode 100644 index e23c210de0..0000000000 --- a/src/storage/src/memtable/schema.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef}; - -#[derive(Clone, Debug, PartialEq)] -pub struct MemtableSchema { - columns_row_key: ColumnsRowKeyMetadataRef, -} - -impl MemtableSchema { - pub fn new(columns_row_key: ColumnsRowKeyMetadataRef) -> MemtableSchema { - MemtableSchema { columns_row_key } - } - - #[inline] - pub fn row_key_columns(&self) -> impl Iterator { - self.columns_row_key.iter_row_key_columns() - } - - #[inline] - pub fn value_columns(&self) -> impl Iterator { - self.columns_row_key.iter_value_columns() - } - - #[inline] - pub fn num_row_key_columns(&self) -> usize { - self.columns_row_key.num_row_key_columns() - } - - #[inline] - pub fn num_value_columns(&self) -> usize { - self.columns_row_key.num_value_columns() - } -} diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 6f31213193..0cf53bda5c 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -4,6 +4,7 @@ use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder}; use super::*; use crate::metadata::RegionMetadata; +use crate::schema::RegionSchemaRef; use crate::test_util::descriptor_util::RegionDescBuilder; // For simplicity, all memtables in test share same memtable id. @@ -12,15 +13,15 @@ const MEMTABLE_ID: MemtableId = 1; // Schema for testing memtable: // - key: Int64(timestamp), UInt64(version), // - value: UInt64 -pub fn schema_for_test() -> MemtableSchema { - // Just build a region desc and use its columns_row_key metadata. +pub fn schema_for_test() -> RegionSchemaRef { + // Just build a region desc and use its columns metadata. let desc = RegionDescBuilder::new("test") .enable_version_column(true) .push_value_column(("v1", LogicalTypeId::UInt64, true)) .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - MemtableSchema::new(metadata.columns_row_key) + metadata.schema().clone() } fn kvs_for_test_with_index( @@ -128,10 +129,8 @@ fn check_iter_content( assert_eq!(keys.len(), index); } -// TODO(yingwen): Check size of the returned batch. - struct MemtableTester { - schema: MemtableSchema, + schema: RegionSchemaRef, builders: Vec, } @@ -172,7 +171,7 @@ impl MemtableTester { } struct TestContext { - schema: MemtableSchema, + schema: RegionSchemaRef, memtable: MemtableRef, } @@ -216,7 +215,7 @@ fn write_iter_memtable_case(ctx: &TestContext) { ..Default::default() }; let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); - assert_eq!(ctx.schema, *iter.schema()); + assert_eq!(ctx.schema, iter.schema()); assert_eq!(RowOrdering::Key, iter.ordering()); check_iter_content( diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index b9df711905..8e84d5eb9a 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -4,30 +4,42 @@ use std::sync::Arc; use common_error::prelude::*; use datatypes::data_type::ConcreteDataType; use serde::{Deserialize, Serialize}; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use store_api::storage::{ - consts, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, - ColumnId, ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, - SchemaRef, + consts::{self, ReservedColumnId}, + ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, ColumnId, + ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, SchemaRef, }; +use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata}; +use crate::schema::{RegionSchema, RegionSchemaRef}; + /// Error for handling metadata. #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Column name already exists, name: {}", name))] + #[snafu(display("Column name {} already exists", name))] ColNameExists { name: String, backtrace: Backtrace }, - #[snafu(display("Column family name already exists, name: {}", name))] + #[snafu(display("Column family name {} already exists", name))] CfNameExists { name: String, backtrace: Backtrace }, - #[snafu(display("Column family id already exists, id: {}", id))] + #[snafu(display("Column family id {} already exists", id))] CfIdExists { id: ColumnId, backtrace: Backtrace }, + #[snafu(display("Column id {} already exists", id))] + ColIdExists { id: ColumnId, backtrace: Backtrace }, + #[snafu(display("Failed to build schema, source: {}", source))] InvalidSchema { - source: datatypes::error::Error, - backtrace: Backtrace, + #[snafu(backtrace)] + source: crate::schema::Error, }, + + #[snafu(display("Column name {} is reserved by the system", name))] + ReservedColumn { name: String, backtrace: Backtrace }, + + #[snafu(display("Missing timestamp key column"))] + MissingTimestamp { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -48,34 +60,27 @@ impl RegionMetaImpl { impl RegionMeta for RegionMetaImpl { fn schema(&self) -> &SchemaRef { - &self.metadata.schema + self.metadata.user_schema() } } pub type VersionNumber = u32; -// TODO(yingwen): Make some fields of metadata private. +// TODO(yingwen): We may need to hold a list of history schema. /// In memory metadata of region. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct RegionMetadata { // The following fields are immutable. id: RegionId, name: String, // The following fields are mutable. - /// Schema of the region. - /// - /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` - /// conveniently. The fields order in `SchemaRef` **must** be consistent with - /// columns order in [ColumnsMetadata] to ensure the projection index of a field - /// is correct. - pub schema: SchemaRef, - pub columns_row_key: ColumnsRowKeyMetadataRef, - pub column_families: ColumnFamiliesMetadata, - /// Version of the metadata. Version is set to zero initially and bumped once the - /// metadata have been altered. - pub version: VersionNumber, + /// Latest schema of the region. + schema: RegionSchemaRef, + pub columns: ColumnsMetadataRef, + column_families: ColumnFamiliesMetadata, + version: VersionNumber, } impl RegionMetadata { @@ -88,66 +93,171 @@ impl RegionMetadata { pub fn name(&self) -> &str { &self.name } + + #[inline] + pub fn schema(&self) -> &RegionSchemaRef { + &self.schema + } + + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + self.schema.user_schema() + } + + #[inline] + pub fn version(&self) -> u32 { + self.schema.version() + } } pub type RegionMetadataRef = Arc; +impl From<&RegionMetadata> for RawRegionMetadata { + fn from(data: &RegionMetadata) -> RawRegionMetadata { + RawRegionMetadata { + id: data.id, + name: data.name.clone(), + columns: (&*data.columns).into(), + column_families: (&data.column_families).into(), + version: data.version, + } + } +} + +impl TryFrom for RegionMetadata { + type Error = Error; + + fn try_from(raw: RawRegionMetadata) -> Result { + let columns = Arc::new(ColumnsMetadata::from(raw.columns)); + let schema = + Arc::new(RegionSchema::new(columns.clone(), raw.version).context(InvalidSchemaSnafu)?); + + Ok(RegionMetadata { + id: raw.id, + name: raw.name, + schema, + columns, + column_families: raw.column_families.into(), + version: raw.version, + }) + } +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ColumnMetadata { pub cf_id: ColumnFamilyId, pub desc: ColumnDescriptor, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq)] pub struct ColumnsMetadata { - /// All columns, in `(key columns, timestamp, [version,] value columns)` order. + /// All columns. /// - /// Columns order should be consistent with fields order in [SchemaRef]. - pub columns: Vec, + /// Columns are organized in the following order: + /// ```text + /// key columns, timestamp, [version,] value columns, internal columns + /// ``` + /// + /// The key columns, timestamp and version forms the row key. + columns: Vec, /// Maps column name to index of columns, used to fast lookup column by name. - pub name_to_col_index: HashMap, -} - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] -pub struct RowKeyMetadata { + name_to_col_index: HashMap, /// Exclusive end index of row key columns. row_key_end: usize, /// Index of timestamp key column. - pub timestamp_key_index: usize, + timestamp_key_index: usize, /// If version column is enabled, then the last column of key columns is a /// version column. - pub enable_version_column: bool, + enable_version_column: bool, + /// Exclusive end index of user columns. + /// + /// Columns in `[user_column_end..)` are internal columns. + user_column_end: usize, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct ColumnsRowKeyMetadata { - columns: ColumnsMetadata, - row_key: RowKeyMetadata, -} - -impl ColumnsRowKeyMetadata { +impl ColumnsMetadata { + /// Returns an iterator to all row key columns. + /// + /// Row key columns includes all key columns, the timestamp column and the + /// optional version column. pub fn iter_row_key_columns(&self) -> impl Iterator { - self.columns.columns.iter().take(self.row_key.row_key_end) + self.columns.iter().take(self.row_key_end) } + /// Returns an iterator to all value columns (internal columns are excluded). pub fn iter_value_columns(&self) -> impl Iterator { - self.columns.columns.iter().skip(self.row_key.row_key_end) + self.columns[self.row_key_end..self.user_column_end].iter() + } + + pub fn iter_user_columns(&self) -> impl Iterator { + self.columns.iter().take(self.user_column_end) + } + + pub fn iter_all_columns(&self) -> impl Iterator { + self.columns.iter() } #[inline] pub fn num_row_key_columns(&self) -> usize { - self.row_key.row_key_end + self.row_key_end } #[inline] pub fn num_value_columns(&self) -> usize { - self.columns.columns.len() - self.row_key.row_key_end + self.user_column_end - self.row_key_end + } + + #[inline] + pub fn timestamp_key_index(&self) -> usize { + self.timestamp_key_index + } + + #[inline] + pub fn row_key_end(&self) -> usize { + self.row_key_end + } + + #[inline] + pub fn user_column_end(&self) -> usize { + self.user_column_end } } -pub type ColumnsRowKeyMetadataRef = Arc; +pub type ColumnsMetadataRef = Arc; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +impl From<&ColumnsMetadata> for RawColumnsMetadata { + fn from(data: &ColumnsMetadata) -> RawColumnsMetadata { + RawColumnsMetadata { + columns: data.columns.clone(), + row_key_end: data.row_key_end, + timestamp_key_index: data.timestamp_key_index, + enable_version_column: data.enable_version_column, + user_column_end: data.user_column_end, + } + } +} + +impl From for ColumnsMetadata { + fn from(raw: RawColumnsMetadata) -> ColumnsMetadata { + let name_to_col_index = raw + .columns + .iter() + .enumerate() + .map(|(i, col)| (col.desc.name.clone(), i)) + .collect(); + + ColumnsMetadata { + columns: raw.columns, + name_to_col_index, + row_key_end: raw.row_key_end, + timestamp_key_index: raw.timestamp_key_index, + enable_version_column: raw.enable_version_column, + user_column_end: raw.user_column_end, + } + } +} + +#[derive(Clone, Debug, PartialEq)] pub struct ColumnFamiliesMetadata { /// Map column family id to column family metadata. id_to_cfs: HashMap, @@ -159,6 +269,24 @@ impl ColumnFamiliesMetadata { } } +impl From<&ColumnFamiliesMetadata> for RawColumnFamiliesMetadata { + fn from(data: &ColumnFamiliesMetadata) -> RawColumnFamiliesMetadata { + let column_families = data.id_to_cfs.values().cloned().collect(); + RawColumnFamiliesMetadata { column_families } + } +} + +impl From for ColumnFamiliesMetadata { + fn from(raw: RawColumnFamiliesMetadata) -> ColumnFamiliesMetadata { + let id_to_cfs = raw + .column_families + .into_iter() + .map(|cf| (cf.cf_id, cf)) + .collect(); + ColumnFamiliesMetadata { id_to_cfs } + } +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct ColumnFamilyMetadata { /// Column family name. @@ -189,6 +317,7 @@ impl TryFrom for RegionMetadata { } } +// TODO(yingwen): Add a builder to build ColumnMetadata and refactor this builder. #[derive(Default)] struct RegionMetadataBuilder { id: RegionId, @@ -196,8 +325,13 @@ struct RegionMetadataBuilder { columns: Vec, column_schemas: Vec, name_to_col_index: HashMap, + /// Column id set, used to validate column id uniqueness. + column_ids: HashSet, - row_key: RowKeyMetadata, + // Row key metadata: + row_key_end: usize, + timestamp_key_index: Option, + enable_version_column: bool, id_to_cfs: HashMap, cf_names: HashSet, @@ -224,7 +358,7 @@ impl RegionMetadataBuilder { } // TODO(yingwen): Validate this is a timestamp column. - let timestamp_key_index = self.columns.len(); + self.timestamp_key_index = Some(self.columns.len()); self.push_row_key_column(key.timestamp)?; if key.enable_version_column { @@ -233,13 +367,8 @@ impl RegionMetadataBuilder { self.push_row_key_column(version_col)?; } - let row_key_end = self.columns.len(); - - self.row_key = RowKeyMetadata { - row_key_end, - timestamp_key_index, - enable_version_column: key.enable_version_column, - }; + self.row_key_end = self.columns.len(); + self.enable_version_column = key.enable_version_column; Ok(self) } @@ -274,29 +403,33 @@ impl RegionMetadataBuilder { Ok(self) } - fn build(self) -> Result { - let schema = if self.column_schemas.is_empty() { - Arc::new(Schema::new(self.column_schemas)) - } else { - Arc::new( - Schema::with_timestamp_index(self.column_schemas, self.row_key.timestamp_key_index) - .context(InvalidSchemaSnafu)?, - ) - }; - let columns = ColumnsMetadata { + fn build(mut self) -> Result { + let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?; + + let user_column_end = self.columns.len(); + // Setup internal columns. + for internal_desc in internal_column_descs() { + self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?; + } + + let columns = Arc::new(ColumnsMetadata { columns: self.columns, name_to_col_index: self.name_to_col_index, - }; - let columns_row_key = Arc::new(ColumnsRowKeyMetadata { - columns, - row_key: self.row_key, + row_key_end: self.row_key_end, + timestamp_key_index, + enable_version_column: self.enable_version_column, + user_column_end, }); + let schema = Arc::new( + RegionSchema::new(columns.clone(), Schema::INITIAL_VERSION) + .context(InvalidSchemaSnafu)?, + ); Ok(RegionMetadata { id: self.id, name: self.name, schema, - columns_row_key, + columns, column_families: ColumnFamiliesMetadata { id_to_cfs: self.id_to_cfs, }, @@ -311,22 +444,35 @@ impl RegionMetadataBuilder { } fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { + ensure!( + !is_internal_value_column(&desc.name), + ReservedColumnSnafu { name: &desc.name } + ); + + self.push_new_column(cf_id, desc) + } + + fn push_new_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { ensure!( !self.name_to_col_index.contains_key(&desc.name), ColNameExistsSnafu { name: &desc.name } ); + ensure!( + !self.column_ids.contains(&desc.id), + ColIdExistsSnafu { id: desc.id } + ); let column_schema = ColumnSchema::from(&desc); let column_name = desc.name.clone(); + let column_id = desc.id; let meta = ColumnMetadata { cf_id, desc }; - // TODO(yingwen): Store cf_id to metadata in field. - let column_index = self.columns.len(); self.columns.push(meta); self.column_schemas.push(column_schema); self.name_to_col_index.insert(column_name, column_index); + self.column_ids.insert(column_id); Ok(()) } @@ -334,7 +480,7 @@ impl RegionMetadataBuilder { fn version_column_desc() -> ColumnDescriptor { ColumnDescriptorBuilder::new( - consts::VERSION_COLUMN_ID, + ReservedColumnId::version(), consts::VERSION_COLUMN_NAME.to_string(), ConcreteDataType::uint64_datatype(), ) @@ -343,6 +489,36 @@ fn version_column_desc() -> ColumnDescriptor { .unwrap() } +fn internal_column_descs() -> [ColumnDescriptor; 2] { + [ + ColumnDescriptorBuilder::new( + ReservedColumnId::sequence(), + consts::SEQUENCE_COLUMN_NAME.to_string(), + ConcreteDataType::uint64_datatype(), + ) + .is_nullable(false) + .build() + .unwrap(), + ColumnDescriptorBuilder::new( + ReservedColumnId::value_type(), + consts::VALUE_TYPE_COLUMN_NAME.to_string(), + ConcreteDataType::uint8_datatype(), + ) + .is_nullable(false) + .build() + .unwrap(), + ] +} + +/// Returns true if this is an internal column for value column. +#[inline] +fn is_internal_value_column(column_name: &str) -> bool { + matches!( + column_name, + consts::SEQUENCE_COLUMN_NAME | consts::VALUE_TYPE_COLUMN_NAME + ) +} + // TODO(yingwen): Add tests for using invalid row_key/cf to build metadata. #[cfg(test)] mod tests { @@ -378,33 +554,95 @@ mod tests { let metadata = RegionMetadata::try_from(desc).unwrap(); assert_eq!(region_name, metadata.name); - assert_eq!(expect_schema, metadata.schema); - assert_eq!(2, metadata.columns_row_key.num_row_key_columns()); - assert_eq!(1, metadata.columns_row_key.num_value_columns()); + assert_eq!(expect_schema, *metadata.user_schema()); + assert_eq!(2, metadata.columns.num_row_key_columns()); + assert_eq!(1, metadata.columns.num_value_columns()); } #[test] fn test_build_empty_region_metadata() { - let metadata = RegionMetadataBuilder::default().build().unwrap(); - assert!(metadata.schema.column_schemas().is_empty()); + let err = RegionMetadataBuilder::default().build().err().unwrap(); + assert!(matches!(err, Error::MissingTimestamp { .. })); + } - assert!(metadata.columns_row_key.columns.columns.is_empty()); - assert_eq!(0, metadata.columns_row_key.num_row_key_columns()); - assert!(metadata - .columns_row_key - .iter_row_key_columns() - .next() - .is_none()); - assert_eq!(0, metadata.columns_row_key.num_value_columns()); - assert!(metadata - .columns_row_key - .iter_value_columns() - .next() - .is_none()); + #[test] + fn test_build_metadata_duplicate_name() { + let cf = ColumnFamilyDescriptorBuilder::default() + .push_column( + ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .push_column( + ColumnDescriptorBuilder::new(5, "v1", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new() + .add_column_family(cf) + .err() + .unwrap(); + assert!(matches!(err, Error::ColNameExists { .. })); + } - assert!(metadata.column_families.id_to_cfs.is_empty()); + #[test] + fn test_build_metadata_internal_name() { + let names = [consts::SEQUENCE_COLUMN_NAME, consts::VALUE_TYPE_COLUMN_NAME]; + for name in names { + let cf = ColumnFamilyDescriptorBuilder::default() + .push_column( + ColumnDescriptorBuilder::new(5, name, ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new() + .add_column_family(cf) + .err() + .unwrap(); + assert!(matches!(err, Error::ReservedColumn { .. })); + } + } - assert_eq!(0, metadata.version); + #[test] + fn test_build_metadata_duplicate_id() { + let cf = ColumnFamilyDescriptorBuilder::default() + .push_column( + ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .push_column( + ColumnDescriptorBuilder::new(4, "v2", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new() + .add_column_family(cf) + .err() + .unwrap(); + assert!(matches!(err, Error::ColIdExists { .. })); + + let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build() + .unwrap(); + let row_key = RowKeyDescriptorBuilder::new(timestamp) + .push_column( + ColumnDescriptorBuilder::new(2, "k1", ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new().row_key(row_key).err().unwrap(); + assert!(matches!(err, Error::ColIdExists { .. })); } fn new_metadata(enable_version_column: bool) -> RegionMetadata { @@ -454,30 +692,30 @@ mod tests { Some(1), ); - assert_eq!(expect_schema, metadata.schema); + assert_eq!(expect_schema, *metadata.user_schema()); - // 3 columns - assert_eq!(3, metadata.columns_row_key.columns.columns.len()); + // 3 user columns and 2 internal columns + assert_eq!(5, metadata.columns.columns.len()); // 2 row key columns - assert_eq!(2, metadata.columns_row_key.num_row_key_columns()); + assert_eq!(2, metadata.columns.num_row_key_columns()); let row_key_names: Vec<_> = metadata - .columns_row_key + .columns .iter_row_key_columns() .map(|column| &column.desc.name) .collect(); assert_eq!(["k1", "ts"], &row_key_names[..]); // 1 value column - assert_eq!(1, metadata.columns_row_key.num_value_columns()); + assert_eq!(1, metadata.columns.num_value_columns()); let value_names: Vec<_> = metadata - .columns_row_key + .columns .iter_value_columns() .map(|column| &column.desc.name) .collect(); assert_eq!(["v1"], &value_names[..]); // Check timestamp index. - assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index); + assert_eq!(1, metadata.columns.timestamp_key_index); // Check version column. - assert!(!metadata.columns_row_key.row_key.enable_version_column); + assert!(!metadata.columns.enable_version_column); assert!(metadata .column_families @@ -502,14 +740,14 @@ mod tests { Some(1), ); - assert_eq!(expect_schema, metadata.schema); + assert_eq!(expect_schema, *metadata.user_schema()); - // 4 columns - assert_eq!(4, metadata.columns_row_key.columns.columns.len()); + // 4 user columns and 2 internal columns. + assert_eq!(6, metadata.columns.columns.len()); // 3 row key columns - assert_eq!(3, metadata.columns_row_key.num_row_key_columns()); + assert_eq!(3, metadata.columns.num_row_key_columns()); let row_key_names: Vec<_> = metadata - .columns_row_key + .columns .iter_row_key_columns() .map(|column| &column.desc.name) .collect(); @@ -518,16 +756,25 @@ mod tests { &row_key_names[..] ); // 1 value column - assert_eq!(1, metadata.columns_row_key.num_value_columns()); + assert_eq!(1, metadata.columns.num_value_columns()); let value_names: Vec<_> = metadata - .columns_row_key + .columns .iter_value_columns() .map(|column| &column.desc.name) .collect(); assert_eq!(["v1"], &value_names[..]); // Check timestamp index. - assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index); + assert_eq!(1, metadata.columns.timestamp_key_index); // Check version column. - assert!(metadata.columns_row_key.row_key.enable_version_column); + assert!(metadata.columns.enable_version_column); + } + + #[test] + fn test_convert_between_raw() { + let metadata = new_metadata(true); + let raw = RawRegionMetadata::from(&metadata); + + let converted = RegionMetadata::try_from(raw).unwrap(); + assert_eq!(metadata, converted); } } diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index 2dcc5f2231..fe29451e87 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -36,11 +36,10 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Invalid timestamp index: {}", index))] - InvalidTimestampIndex { - index: usize, + #[snafu(display("Failed to convert schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] source: datatypes::error::Error, - backtrace: Backtrace, }, } @@ -81,10 +80,10 @@ impl TryFrom for schema::SchemaRef { let schema: schema::SchemaRef = match schema.timestamp_index { Some(index) => Arc::new( - schema::Schema::with_timestamp_index(column_schemas, index.value as usize) - .context(InvalidTimestampIndexSnafu { - index: index.value as usize, - })?, + schema::SchemaBuilder::from(column_schemas) + .timestamp_index(index.value as usize) + .build() + .context(ConvertSchemaSnafu)?, ), None => Arc::new(schema::Schema::new(column_schemas)), }; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 84788f7ad4..fce78bbf79 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; use datatypes::schema::SchemaRef; -use snafu::ensure; +use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::manifest::{ self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator, @@ -103,7 +103,7 @@ impl RegionImpl { .update(RegionMetaActionList::new(vec![ RegionMetaAction::Protocol(ProtocolAction::new()), RegionMetaAction::Change(RegionChange { - metadata: metadata.clone(), + metadata: (&*metadata).into(), }), ])) .await?; @@ -206,8 +206,13 @@ impl RegionImpl { for action in action_list.actions { match (action, version) { (RegionMetaAction::Change(c), None) => { + let region = c.metadata.name.clone(); + let region_metadata = c + .metadata + .try_into() + .context(error::InvalidRawRegionSnafu { region })?; version = Some(Version::with_manifest_version( - c.metadata, + Arc::new(region_metadata), last_manifest_version, )); for (manifest_version, action) in actions.drain(..) { diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 7b7f7ce969..93fc267260 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -197,7 +197,7 @@ async fn test_recover_region_manifets() { manifest .update(RegionMetaActionList::with_action(RegionMetaAction::Change( RegionChange { - metadata: region_meta.clone(), + metadata: (&*region_meta).into(), }, ))) .await diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 74bb9632a7..aac1d1031f 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -309,7 +309,7 @@ impl WriterInner { && memtables_to_add.get_by_range(range).is_none() { // Memtable for this range is missing, need to create a new memtable. - let memtable_schema = current_version.memtable_schema(); + let memtable_schema = current_version.schema().clone(); let id = self.alloc_memtable_id(); let memtable = self.memtable_builder.build(id, memtable_schema); memtables_to_add.insert(*range, memtable); diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs new file mode 100644 index 0000000000..06ad6ad8ac --- /dev/null +++ b/src/storage/src/schema.rs @@ -0,0 +1,450 @@ +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::arrow::array::Array; +use datatypes::arrow::chunk::Chunk; +use datatypes::arrow::datatypes::Schema as ArrowSchema; +use datatypes::prelude::Vector; +use datatypes::schema::Metadata; +use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector}; +use serde::{Deserialize, Serialize}; +use snafu::ensure; +use store_api::storage::{consts, ColumnSchema, Schema, SchemaBuilder, SchemaRef}; + +use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; +use crate::read::Batch; + +const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end"; +const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end"; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to build schema, source: {}", source))] + BuildSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to convert from arrow schema, source: {}", source))] + ConvertArrowSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Invalid internal column index in arrow schema"))] + InvalidIndex { backtrace: Backtrace }, + + #[snafu(display("Missing metadata {} in arrow schema", key))] + MissingMeta { key: String, backtrace: Backtrace }, + + #[snafu(display("Missing column {} in arrow schema", column))] + MissingColumn { + column: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to parse index in schema meta, value: {}, source: {}", + value, + source + ))] + ParseIndex { + value: String, + source: std::num::ParseIntError, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to convert arrow chunk to batch, name: {}, source: {}", + name, + source + ))] + ConvertChunk { + name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, +} + +pub type Result = std::result::Result; + +/// Schema of region. +/// +/// The `RegionSchema` has the knowledge of reserved and internal columns. +/// Reserved columns are columns that their names, ids are reserved by the storage +/// engine, and could not be used by the user. Reserved columns usually have +/// special usage. Reserved columns expect the version columns are also +/// called internal columns (though the version could also be thought as a +/// special kind of internal column), are not visible to user, such as our +/// internal sequence, value_type columns. +/// +/// The user schema is the schema that only contains columns that user could visit, +/// as well as what the schema user created. +#[derive(Debug, PartialEq)] +pub struct RegionSchema { + /// Schema that only contains columns that user defined, excluding internal columns + /// that are reserved and used by the storage engine. + /// + /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` + /// conveniently. The fields order in `SchemaRef` **must** be consistent with + /// columns order in [ColumnsMetadata] to ensure the projection index of a field + /// is correct. + user_schema: SchemaRef, + /// SST schema contains all columns of the region, including all internal columns. + sst_schema: SstSchema, + /// Metadata of columns. + columns: ColumnsMetadataRef, +} + +impl RegionSchema { + pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { + let user_schema = Arc::new(build_user_schema(&columns, version)?); + let sst_schema = SstSchema::new(&columns, version)?; + + debug_assert_eq!(user_schema.version(), sst_schema.version()); + debug_assert_eq!(version, user_schema.version()); + + Ok(RegionSchema { + user_schema, + sst_schema, + columns, + }) + } + + /// Returns the schema of the region, excluding internal columns that used by + /// the storage engine. + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + &self.user_schema + } + + /// Returns the schema for sst, which contains all columns used by the region. + #[inline] + pub fn sst_schema(&self) -> &SstSchema { + &self.sst_schema + } + + #[inline] + pub fn row_key_columns(&self) -> impl Iterator { + self.columns.iter_row_key_columns() + } + + #[inline] + pub fn value_columns(&self) -> impl Iterator { + self.columns.iter_value_columns() + } + + #[inline] + pub fn num_row_key_columns(&self) -> usize { + self.columns.num_row_key_columns() + } + + #[inline] + pub fn num_value_columns(&self) -> usize { + self.columns.num_value_columns() + } + + #[inline] + pub fn version(&self) -> u32 { + self.user_schema.version() + } +} + +pub type RegionSchemaRef = Arc; + +/// Schema of SST. +/// +/// Only contains a reference to schema and some indices, so it should be cheap to clone. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SstSchema { + schema: SchemaRef, + row_key_end: usize, + user_column_end: usize, +} + +impl SstSchema { + fn new(columns: &ColumnsMetadata, version: u32) -> Result { + let column_schemas: Vec<_> = columns + .iter_all_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + let schema = SchemaBuilder::from(column_schemas) + .timestamp_index(columns.timestamp_key_index()) + .version(version) + .add_metadata(ROW_KEY_END_KEY, columns.row_key_end().to_string()) + .add_metadata(USER_COLUMN_END_KEY, columns.user_column_end().to_string()) + .build() + .context(BuildSchemaSnafu)?; + + let user_column_end = columns.user_column_end(); + assert_eq!( + consts::SEQUENCE_COLUMN_NAME, + schema.column_schemas()[user_column_end].name + ); + assert_eq!( + consts::VALUE_TYPE_COLUMN_NAME, + schema.column_schemas()[user_column_end + 1].name + ); + + Ok(SstSchema { + schema: Arc::new(schema), + row_key_end: columns.row_key_end(), + user_column_end, + }) + } + + #[inline] + pub fn version(&self) -> u32 { + self.schema.version() + } + + #[inline] + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + #[inline] + pub fn arrow_schema(&self) -> &Arc { + self.schema.arrow_schema() + } + + pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> Chunk> { + assert_eq!( + self.schema.num_columns(), + // key columns + value columns + sequence + value_type + batch.keys.len() + batch.values.len() + 2 + ); + + Chunk::new( + batch + .keys + .iter() + .map(|v| v.to_arrow_array()) + .chain(batch.values.iter().map(|v| v.to_arrow_array())) + .chain(std::iter::once(batch.sequences.to_arrow_array())) + .chain(std::iter::once(batch.value_types.to_arrow_array())) + .collect(), + ) + } + + pub fn arrow_chunk_to_batch(&self, chunk: &Chunk>) -> Result { + let keys = self + .row_key_indices() + .map(|i| { + Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu { + name: self.column_name(i), + }) + }) + .collect::>()?; + let sequences = UInt64Vector::try_from_arrow_array(&chunk[self.sequence_index()].clone()) + .context(ConvertChunkSnafu { + name: consts::SEQUENCE_COLUMN_NAME, + })?; + let value_types = UInt8Vector::try_from_arrow_array( + &chunk[self.value_type_index()].clone(), + ) + .context(ConvertChunkSnafu { + name: consts::VALUE_TYPE_COLUMN_NAME, + })?; + let values = self + .value_indices() + .map(|i| { + Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu { + name: self.column_name(i), + }) + }) + .collect::>()?; + + Ok(Batch { + keys, + sequences, + value_types, + values, + }) + } + + #[inline] + fn sequence_index(&self) -> usize { + self.user_column_end + } + + #[inline] + fn value_type_index(&self) -> usize { + self.user_column_end + 1 + } + + #[inline] + fn row_key_indices(&self) -> impl Iterator { + 0..self.row_key_end + } + + #[inline] + fn value_indices(&self) -> impl Iterator { + self.row_key_end..self.user_column_end + } + + #[inline] + fn column_name(&self, idx: usize) -> &str { + &self.schema.column_schemas()[idx].name + } +} + +impl TryFrom for SstSchema { + type Error = Error; + + fn try_from(arrow_schema: ArrowSchema) -> Result { + let schema = Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?; + // Recover other metadata from schema. + let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?; + let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?; + + // There should be sequence and value type columns. + ensure!( + consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name, + InvalidIndexSnafu + ); + ensure!( + consts::VALUE_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name, + InvalidIndexSnafu + ); + + Ok(SstSchema { + schema: Arc::new(schema), + row_key_end, + user_column_end, + }) + } +} + +fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { + let value = metadata.get(key).context(MissingMetaSnafu { key })?; + value.parse().context(ParseIndexSnafu { value }) +} + +// Now user schema don't have extra metadata like sst schema. +fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result { + let column_schemas: Vec<_> = columns + .iter_user_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + SchemaBuilder::from(column_schemas) + .timestamp_index(columns.timestamp_key_index()) + .version(version) + .build() + .context(BuildSchemaSnafu) +} + +#[cfg(test)] +mod tests { + use datatypes::type_id::LogicalTypeId; + use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector}; + + use super::*; + use crate::metadata::RegionMetadata; + use crate::test_util::{descriptor_util::RegionDescBuilder, schema_util}; + + fn new_batch() -> Batch { + let k1 = Int64Vector::from_slice(&[1, 2, 3]); + let timestamp = Int64Vector::from_slice(&[4, 5, 6]); + let v1 = Int64Vector::from_slice(&[7, 8, 9]); + + Batch { + keys: vec![Arc::new(k1), Arc::new(timestamp)], + values: vec![Arc::new(v1)], + sequences: UInt64Vector::from_slice(&[100, 100, 100]), + value_types: UInt8Vector::from_slice(&[0, 0, 0]), + } + } + + fn check_chunk_batch(chunk: &Chunk>, batch: &Batch) { + assert_eq!(5, chunk.columns().len()); + assert_eq!(3, chunk.len()); + + for i in 0..2 { + assert_eq!(chunk[i], batch.keys[i].to_arrow_array()); + } + assert_eq!(chunk[2], batch.values[0].to_arrow_array()); + assert_eq!(chunk[3], batch.sequences.to_arrow_array()); + assert_eq!(chunk[4], batch.value_types.to_arrow_array()); + } + + #[test] + fn test_region_schema() { + let desc = RegionDescBuilder::new("test") + .push_key_column(("k1", LogicalTypeId::Int64, false)) + .push_value_column(("v1", LogicalTypeId::Int64, true)) + .build(); + let metadata: RegionMetadata = desc.try_into().unwrap(); + + let columns = metadata.columns; + let region_schema = RegionSchema::new(columns.clone(), 0).unwrap(); + + let expect_schema = schema_util::new_schema( + &[ + ("k1", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Int64, true), + ], + Some(1), + ); + + assert_eq!(expect_schema, **region_schema.user_schema()); + + let mut row_keys = region_schema.row_key_columns(); + assert_eq!("k1", row_keys.next().unwrap().desc.name); + assert_eq!("timestamp", row_keys.next().unwrap().desc.name); + assert_eq!(None, row_keys.next()); + assert_eq!(2, region_schema.num_row_key_columns()); + + let mut values = region_schema.value_columns(); + assert_eq!("v1", values.next().unwrap().desc.name); + assert_eq!(None, values.next()); + assert_eq!(1, region_schema.num_value_columns()); + + assert_eq!(0, region_schema.version()); + { + let region_schema = RegionSchema::new(columns, 1234).unwrap(); + assert_eq!(1234, region_schema.version()); + assert_eq!(1234, region_schema.sst_schema().version()); + } + + let sst_schema = region_schema.sst_schema(); + let sst_arrow_schema = sst_schema.arrow_schema(); + let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap(); + + assert_eq!(*sst_schema, converted_sst_schema); + + let expect_schema = schema_util::new_schema( + &[ + ("k1", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Int64, true), + (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), + (consts::VALUE_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), + ], + Some(1), + ); + assert_eq!( + expect_schema.column_schemas(), + sst_schema.schema().column_schemas() + ); + assert_eq!(3, sst_schema.sequence_index()); + assert_eq!(4, sst_schema.value_type_index()); + let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect(); + assert_eq!([0, 1], &row_key_indices[..]); + let value_indices: Vec<_> = sst_schema.value_indices().collect(); + assert_eq!([2], &value_indices[..]); + + // Test batch and chunk conversion. + let batch = new_batch(); + // Convert batch to chunk. + let chunk = sst_schema.batch_to_arrow_chunk(&batch); + check_chunk_batch(&chunk, &batch); + + // Convert chunk to batch. + let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap(); + check_chunk_batch(&chunk, &converted_batch); + } +} diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index ca861a9f96..50adc5e338 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -26,7 +26,7 @@ impl Snapshot for SnapshotImpl { type Reader = ChunkReaderImpl; fn schema(&self) -> &SchemaRef { - self.version.schema() + self.version.user_schema() } async fn scan( @@ -41,7 +41,7 @@ impl Snapshot for SnapshotImpl { let immutables = memtable_version.immutable_memtables(); let mut builder = - ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) + ChunkReaderBuilder::new(self.version.user_schema().clone(), self.sst_layer.clone()) .reserve_num_memtables(memtable_version.num_memtables()) .iter_ctx(IterContext { batch_size: ctx.batch_size, diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 8db579f36d..e0799d0463 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -15,22 +15,15 @@ use datatypes::arrow::io::parquet::read::{ use datatypes::arrow::io::parquet::write::{ Compression, Encoding, FileSink, Version, WriteOptions, }; -use datatypes::prelude::{ConcreteDataType, Vector}; -use datatypes::schema::ColumnSchema; -use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector}; use futures_util::sink::SinkExt; use futures_util::{Stream, TryStreamExt}; use object_store::{ObjectStore, SeekableReader}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::consts; +use snafu::ResultExt; -use crate::error::{ - FlushIoSnafu, InvalidParquetSchemaSnafu, ReadParquetIoSnafu, ReadParquetSnafu, Result, - SequenceColumnNotFoundSnafu, WriteParquetSnafu, -}; -use crate::memtable::{BoxedBatchIterator, MemtableSchema}; -use crate::metadata::ColumnMetadata; +use crate::error::{self, Result}; +use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; +use crate::schema::SstSchema; use crate::sst; /// Parquet sst writer. @@ -61,20 +54,23 @@ impl<'a> ParquetWriter<'a> { /// A chunk of records yielded from each iteration with a size given /// in config will be written to a single row group. async fn write_rows(self, extra_meta: Option>) -> Result<()> { - let schema = memtable_schema_to_arrow_schema(self.iter.schema()); + let region_schema = self.iter.schema(); + let sst_schema = region_schema.sst_schema(); + let schema = sst_schema.arrow_schema(); let object = self.object_store.object(self.file_path); // FIXME(hl): writer size is not used in fs backend so just leave it to 0, // but in s3/azblob backend the Content-Length field of HTTP request is set // to this value. - let writer = object.writer(0).await.context(FlushIoSnafu)?; + let writer = object.writer(0).await.context(error::FlushIoSnafu)?; // now all physical types use plain encoding, maybe let caller to choose encoding for each type. - let encodings = get_encoding_for_schema(&schema, |_| Encoding::Plain); + let encodings = get_encoding_for_schema(&*schema, |_| Encoding::Plain); let mut sink = FileSink::try_new( writer, - schema, + // The file sink needs the `Schema` instead of a reference. + (**schema).clone(), encodings, WriteOptions { write_statistics: true, @@ -82,22 +78,13 @@ impl<'a> ParquetWriter<'a> { version: Version::V2, }, ) - .context(WriteParquetSnafu)?; + .context(error::WriteParquetSnafu)?; for batch in self.iter { let batch = batch?; - sink.send(Chunk::new( - batch - .keys - .iter() - .map(|v| v.to_arrow_array()) - .chain(std::iter::once(batch.sequences.to_arrow_array())) - .chain(std::iter::once(batch.value_types.to_arrow_array())) - .chain(batch.values.iter().map(|v| v.to_arrow_array())) - .collect(), - )) - .await - .context(WriteParquetSnafu)?; + sink.send(sst_schema.batch_to_arrow_chunk(&batch)) + .await + .context(error::WriteParquetSnafu)?; } if let Some(meta) = extra_meta { @@ -105,42 +92,13 @@ impl<'a> ParquetWriter<'a> { sink.metadata.insert(k, Some(v)); } } - sink.close().await.context(WriteParquetSnafu)?; + sink.close().await.context(error::WriteParquetSnafu)?; // FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162), // upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version. - sink.flush().await.context(WriteParquetSnafu) + sink.flush().await.context(error::WriteParquetSnafu) } } -/// Assembles arrow schema from memtable schema info. -/// TODO(hl): implement `From` for `MemtableSchema`/`Physical schema` -fn memtable_schema_to_arrow_schema(schema: &MemtableSchema) -> Schema { - let col_meta_to_field: fn(&ColumnMetadata) -> Field = |col_meta| { - Field::from(&ColumnSchema::new( - col_meta.desc.name.clone(), - col_meta.desc.data_type.clone(), - col_meta.desc.is_nullable, - )) - }; - - let fields = schema - .row_key_columns() - .map(col_meta_to_field) - .chain(std::iter::once(Field::from(&ColumnSchema::new( - consts::SEQUENCE_COLUMN_NAME, - ConcreteDataType::uint64_datatype(), - false, - )))) - .chain(std::iter::once(Field::from(&ColumnSchema::new( - consts::VALUE_TYPE_COLUMN_NAME, - ConcreteDataType::uint8_datatype(), - false, - )))) - .chain(schema.value_columns().map(col_meta_to_field)) - .collect::>(); - Schema::from(fields) -} - fn get_encoding_for_schema Encoding + Clone>( schema: &Schema, map: F, @@ -213,9 +171,11 @@ impl<'a> ParquetReader<'a> { } } + // TODO(yingwen): Projection is not supported now, since field index would change after projection. + // To support projection, we may need to implement some helper methods in schema. pub async fn chunk_stream( &self, - projection: Option, + _projection: Option, chunk_size: usize, ) -> Result { let file_path = self.file_path.to_string(); @@ -229,17 +189,17 @@ impl<'a> ParquetReader<'a> { let file_path = self.file_path.to_string(); let mut reader = reader_factory() .await - .context(ReadParquetIoSnafu { file: &file_path })?; + .context(error::ReadParquetIoSnafu { file: &file_path })?; let metadata = read_metadata_async(&mut reader) .await - .context(ReadParquetSnafu { file: &file_path })?; - let schema = infer_schema(&metadata).context(ReadParquetSnafu { file: &file_path })?; + .context(error::ReadParquetSnafu { file: &file_path })?; + let arrow_schema = + infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?; + // Just read all fields. + let projected_fields = arrow_schema.fields.clone(); - let projected_fields = projection.map_or_else(|| schema.fields.clone(), |p| p(&schema)); - let projected_schema = Schema { - fields: projected_fields.clone(), - metadata: schema.metadata, - }; + let sst_schema = SstSchema::try_from(arrow_schema) + .context(error::ConvertSstSchemaSnafu { file: &file_path })?; let chunk_stream = try_stream!({ for rg in metadata.row_groups { @@ -250,36 +210,31 @@ impl<'a> ParquetReader<'a> { Some(chunk_size), ) .await - .context(ReadParquetSnafu { file: &file_path })?; + .context(error::ReadParquetSnafu { file: &file_path })?; let chunks = RowGroupDeserializer::new(column_chunks, rg.num_rows() as usize, None); for maybe_chunk in chunks { let columns_in_chunk = - maybe_chunk.context(ReadParquetSnafu { file: &file_path })?; + maybe_chunk.context(error::ReadParquetSnafu { file: &file_path })?; yield columns_in_chunk; } } }); - ChunkStream::new(projected_schema, Box::pin(chunk_stream)) + ChunkStream::new(sst_schema, Box::pin(chunk_stream)) } } -type ChunkConverter = Box>) -> Result + Send + Sync>; - pub type SendableChunkStream = Pin>>> + Send>>; pub struct ChunkStream { + schema: SstSchema, stream: SendableChunkStream, - converter: ChunkConverter, } impl ChunkStream { - pub fn new(schema: Schema, stream: SendableChunkStream) -> Result { - Ok(Self { - converter: batch_converter_factory(schema)?, - stream, - }) + pub fn new(schema: SstSchema, stream: SendableChunkStream) -> Result { + Ok(Self { schema, stream }) } } @@ -289,63 +244,15 @@ impl BatchReader for ChunkStream { self.stream .try_next() .await? - .map(&self.converter) + .map(|chunk| { + self.schema + .arrow_chunk_to_batch(&chunk) + .context(error::InvalidParquetSchemaSnafu) + }) .transpose() } } -// TODO(hl): Currently rowkey/values/reserved columns are identified by their position in field: -// all fields before `__sequence` column are rowkeys and fields after `__value_type` are values. -// But it would be better to persist rowkey/value columns' positions to Parquet metadata and -// parse `MemtableSchema` from metadata while building BatchReader. -fn batch_converter_factory(schema: Schema) -> Result { - let ts_idx = schema - .fields - .iter() - .position(|f| f.name == consts::SEQUENCE_COLUMN_NAME) - .with_context(|| SequenceColumnNotFoundSnafu { - msg: format!("Schema: {:?}", schema), - })?; - - let field_len = schema.fields.len(); - - macro_rules! handle_err { - ($stmt: expr, $schema: ident) => { - $stmt.with_context(|_| InvalidParquetSchemaSnafu { - msg: format!("Schema type error: {:?}", $schema), - })? - }; - } - - let converter = move |c: Chunk>| { - Ok(Batch { - sequences: handle_err!( - UInt64Vector::try_from_arrow_array(&c.arrays()[ts_idx].clone()), - schema - ), - value_types: handle_err!( - UInt8Vector::try_from_arrow_array(&c.arrays()[ts_idx + 1].clone()), - schema - ), - keys: handle_err!( - (0..ts_idx) - .into_iter() - .map(|i| Helper::try_into_vector(&c.arrays()[i].clone())) - .collect::>(), - schema - ), - values: handle_err!( - (ts_idx + 2..field_len) - .into_iter() - .map(|i| Helper::try_into_vector(&c.arrays()[i].clone())) - .collect::>(), - schema - ), - }) - }; - Ok(Box::new(converter)) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -398,10 +305,11 @@ mod tests { let reader = std::fs::File::open(dir.path().join(sst_file_name)).unwrap(); let mut file_reader = FileReader::try_new(reader, None, Some(128), None, None).unwrap(); - // chunk schema: timestamp, __version, __sequence, __value_type, v1 + // chunk schema: timestamp, __version, v1, __sequence, __value_type let chunk = file_reader.next().unwrap().unwrap(); assert_eq!(5, chunk.arrays().len()); + // timestamp assert_eq!( Arc::new(Int64Array::from_slice(&[ 1000, 1000, 1001, 2002, 2003, 2003 @@ -409,23 +317,27 @@ mod tests { chunk.arrays()[0] ); + // version assert_eq!( Arc::new(UInt64Array::from_slice(&[1, 2, 1, 1, 1, 5])) as Arc, chunk.arrays()[1] ); + // v1 assert_eq!( - Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc, + Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc, chunk.arrays()[2] ); + // sequence assert_eq!( - Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc, + Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc, chunk.arrays()[3] ); + // value type assert_eq!( - Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc, + Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc, chunk.arrays()[4] ); } diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index d99dbb90b7..ac0602359d 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use datatypes::prelude::*; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; /// Column definition: (name, datatype, is_nullable) pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); @@ -16,7 +16,10 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> .collect(); if let Some(index) = timestamp_index { - Schema::with_timestamp_index(column_schemas, index).unwrap() + SchemaBuilder::from(column_schemas) + .timestamp_index(index) + .build() + .unwrap() } else { Schema::new(column_schemas) } diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index a5fd13f8b0..93f81cd09f 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -14,8 +14,9 @@ use std::time::Duration; use store_api::manifest::ManifestVersion; use store_api::storage::{SchemaRef, SequenceNumber}; -use crate::memtable::{MemtableId, MemtableSchema, MemtableSet, MemtableVersion}; +use crate::memtable::{MemtableId, MemtableSet, MemtableVersion}; use crate::metadata::RegionMetadataRef; +use crate::schema::RegionSchemaRef; use crate::sst::LevelMetas; use crate::sst::{FileHandle, FileMeta}; use crate::sync::CowCell; @@ -170,8 +171,13 @@ impl Version { } #[inline] - pub fn schema(&self) -> &SchemaRef { - &self.metadata.schema + pub fn schema(&self) -> &RegionSchemaRef { + self.metadata.schema() + } + + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + self.metadata.user_schema() } #[inline] @@ -199,11 +205,6 @@ impl Version { DEFAULT_BUCKET_DURATION } - #[inline] - pub fn memtable_schema(&self) -> MemtableSchema { - MemtableSchema::new(self.metadata.columns_row_key.clone()) - } - pub fn apply_edit(&mut self, edit: VersionEdit) { let flushed_sequence = edit.flushed_sequence.unwrap_or(self.flushed_sequence); if self.flushed_sequence < flushed_sequence { diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 8616c981b9..b2cb9a78f1 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -12,7 +12,7 @@ mod snapshot; mod types; pub use datatypes::data_type::ConcreteDataType; -pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +pub use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 536e0608cd..93572d609f 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -2,11 +2,11 @@ use crate::storage::descriptors::{ColumnFamilyId, ColumnId}; -// ---------- Ids reserved for internal column families ------------------------ +// ---------- Reserved column family ids --------------------------------------- /// Column family Id for row key columns. /// -/// This is virtual column family, actually row key columns are not +/// This is a virtual column family, actually row key columns are not /// stored in any column family. pub const KEY_CF_ID: ColumnFamilyId = 0; /// Id for default column family. @@ -14,11 +14,45 @@ pub const DEFAULT_CF_ID: ColumnFamilyId = 1; // ----------------------------------------------------------------------------- -// ---------- Ids reserved for internal columns -------------------------------- +// ---------- Reserved column ids ---------------------------------------------- -// TODO(yingwen): Reserve one bit for internal columns. -/// Column id for version column. -pub const VERSION_COLUMN_ID: ColumnId = 1; +// The reserved column id is too large to be defined as enum value (denied by the +// `clippy::enum_clike_unportable_variant` lint). So we add this enum as offset +// in ReservedColumnId to get the final column id. +enum ReservedColumnType { + Version = 0, + Sequence, + ValueType, +} + +/// Column id reserved by the engine. +/// +/// All reserved column id has MSB (Most Significant Bit) set to 1. +/// +/// Reserved column includes version column and other internal columns. +pub struct ReservedColumnId; + +impl ReservedColumnId { + // Set MSB to 1. + const BASE: ColumnId = 1 << (ColumnId::BITS - 1); + + /// Column id for version column. + /// Version column is a special reserved column that is enabled by user and + /// visible to user. + pub const fn version() -> ColumnId { + Self::BASE | ReservedColumnType::Version as ColumnId + } + + /// Id for `__sequence` column. + pub const fn sequence() -> ColumnId { + Self::BASE | ReservedColumnType::Sequence as ColumnId + } + + /// Id for `__value_type` column. + pub const fn value_type() -> ColumnId { + Self::BASE | ReservedColumnType::ValueType as ColumnId + } +} // ----------------------------------------------------------------------------- @@ -36,6 +70,7 @@ pub const SEQUENCE_COLUMN_NAME: &str = "__sequence"; /// Name for time index constraint name. pub const TIME_INDEX_NAME: &str = "__time_index"; +// TODO(yingwen): `__op_type` might be proper than `__value_type`. /// Name for reserved column: value_type pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type"; @@ -46,3 +81,15 @@ pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type"; pub const READ_BATCH_SIZE: usize = 256; // ----------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reserved_id() { + assert_eq!(0x80000000, ReservedColumnId::version()); + assert_eq!(0x80000001, ReservedColumnId::sequence()); + assert_eq!(0x80000002, ReservedColumnId::value_type()); + } +} diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index 293db4b89f..fa68020e5c 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -3,8 +3,7 @@ mod mock_engine; use std::sync::Arc; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::SchemaRef; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; use log_store::fs::noop::NoopLogStore; use object_store::{backend::fs::Backend, ObjectStore}; use storage::config::EngineConfig as StorageEngineConfig; @@ -32,7 +31,10 @@ pub fn schema_for_test() -> Schema { ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; - Schema::with_timestamp_index(column_schemas, 0).expect("ts must be timestamp column") + SchemaBuilder::from(column_schemas) + .timestamp_index(0) + .build() + .expect("ts must be timestamp column") } pub type MockMitoEngine = MitoEngine; diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index be8fd4348b..a18b1390ab 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -43,7 +43,7 @@ impl Snapshot for MockSnapshot { type Reader = MockChunkReader; fn schema(&self) -> &SchemaRef { - &self.metadata.schema + self.metadata.user_schema() } async fn scan( @@ -52,7 +52,7 @@ impl Snapshot for MockSnapshot { _request: ScanRequest, ) -> Result> { let reader = MockChunkReader { - schema: self.metadata.schema.clone(), + schema: self.metadata.user_schema().clone(), }; Ok(ScanResponse { reader })