From 7c779a98612a4f2fdcb45aba3b855c10a137511c Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 17 Aug 2022 15:28:38 +0800 Subject: [PATCH] feat: Add region schema for storage engine (#171) * refactor: Merge RowKeyMetadata into ColumnsMetadata Now RowKeyMetadata and ColumnsMetadata are almost always being used together, no need to separate them into two structs. Now they are combined into the single ColumnsMetadata struct. chore: Make some fields of metadata private feat: Replace schema in RegionMetadata by RegionSchema The internal schema of a region should have the knownledge about all internal columns that are reserved and used by the storage engine, such as sequence, value type. So we introduce the `RegionSchema`, and it would holds a `SchemaRef` that only contains the columns that user could see. feat: Value derives Serialize and supports converting into json value feat: Add version to schema The schema version has an initial value 0 and would bump each time the schema being altered. feat: Adds internal columns to region metadata Introduce the concept of reserved columns and internal columns. Reserved columns are columns that their names, ids are reserved by the storage engine, and could not be used by the user. Reserved columns usually have special usage. Reserved columns expect the version columns are also called internal columns (though the version could also be thought as a special kind of internal column), are not visible to user, such as our internal sequence, value_type columns. The RegionMetadataBuilder always push internal columns used by the engine to the columns in metadata. Internal columns are all stored behind all user columns in the columns vector. To avoid column id collision, the id reserved for columns has the most significant bit set to 1. And the RegionMetadataBuilder would check the uniqueness of the column id. chore: Rebase develop and fix compile error feat: add internal schema to region schema feat: Add SchemaBuilder to build Schema feat: Store row key end in region schema metadata Also move the arrow schema construction to region::schema mod feat: Add SstSchema refactor: Replace MemtableSchema by RegionSchema Now when writing sst files, we could use the arrow schema from our sst schema, which contains the internal columns. feat: Use SstSchema to read parquet Adds user_column_end to metadata. When reading parquet file, converts the arrow schema into SstSchema, then uses the row_key_end and user_column_end to find out row key parts, value parts and internal columns, instead of using the timestamp index, which may yields incorrect index if we don't put the timestamp at the end of row key. Move conversion from Batch to arrow Chunk to SstSchema, so SST mod doesn't need to care the order of key, value and internal columns. test: Add test for Value to serde_json::Value feat: Add RawRegionMetadata to persist RegionMetadata test: Add test to RegionSchema fix: Fix clippy To fix clippy::enum_clike_unportable_variant lint, define the column id offset in ReservedColumnType and compute the final column id in ReservedColumnId's const method refactor: Move batch/chunk conversion to SstSchema The parquet ChunkStream now holds the SstSchema and use its method to convert Chunk into Batch. chore: Address CR comment Also add a test for pushing internal column to RegionMetadataBuilder chore: Address CR comment chore: Use bitwise or to compute column id * chore: Address CR comment --- src/catalog/src/error.rs | 8 +- src/catalog/src/system.rs | 18 +- src/datanode/src/server/grpc/insert.rs | 9 +- src/datanode/src/sql.rs | 9 +- src/datanode/src/sql/create.rs | 7 +- src/datanode/tests/http_test.rs | 2 +- src/datanode/tests/test_util.rs | 6 +- src/datatypes/src/error.rs | 11 + src/datatypes/src/lib.rs | 1 + src/datatypes/src/schema.rs | 211 ++++++-- src/datatypes/src/value.rs | 129 ++++- src/datatypes/src/vectors/constant.rs | 2 +- src/storage/benches/memtable/util/mod.rs | 8 +- src/storage/src/error.rs | 27 +- src/storage/src/flush.rs | 2 +- src/storage/src/lib.rs | 1 + src/storage/src/manifest/action.rs | 42 +- src/storage/src/manifest/region.rs | 13 +- src/storage/src/memtable.rs | 11 +- src/storage/src/memtable/btree.rs | 20 +- src/storage/src/memtable/inserter.rs | 15 +- src/storage/src/memtable/schema.rs | 32 -- src/storage/src/memtable/tests.rs | 15 +- src/storage/src/metadata.rs | 473 +++++++++++++----- src/storage/src/proto/write_batch.rs | 15 +- src/storage/src/region.rs | 11 +- src/storage/src/region/tests.rs | 2 +- src/storage/src/region/writer.rs | 2 +- src/storage/src/schema.rs | 450 +++++++++++++++++ src/storage/src/snapshot.rs | 4 +- src/storage/src/sst/parquet.rs | 184 ++----- src/storage/src/test_util/schema_util.rs | 7 +- src/storage/src/version.rs | 17 +- src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/consts.rs | 59 ++- src/table-engine/src/table/test_util.rs | 8 +- .../src/table/test_util/mock_engine.rs | 4 +- 37 files changed, 1369 insertions(+), 468 deletions(-) delete mode 100644 src/storage/src/memtable/schema.rs create mode 100644 src/storage/src/schema.rs diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index b98d7a31c0..9f5f880fdd 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -81,12 +81,6 @@ pub enum Error { source: common_recordbatch::error::Error, }, - #[snafu(display("Failed to build table schema for system catalog table"))] - SystemCatalogSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display( "Failed to insert table creation record to system catalog, source: {}", source @@ -116,7 +110,7 @@ impl ErrorExt for Error { | Error::OpenTable { .. } | Error::ReadSystemCatalog { .. } | Error::InsertTableRecord { .. } => StatusCode::StorageUnavailable, - Error::RegisterTable { .. } | Error::SystemCatalogSchema { .. } => StatusCode::Internal, + Error::RegisterTable { .. } => StatusCode::Internal, Error::TableExists { .. } => StatusCode::TableAlreadyExists, } } diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index fb2a1eb677..25ad3a3eaa 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -7,7 +7,7 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector}; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; use datatypes::vectors::{BinaryVector, Int64Vector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -22,7 +22,7 @@ use crate::consts::{ }; use crate::error::{ CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, - OpenSystemCatalogSnafu, Result, SystemCatalogSchemaSnafu, ValueDeserializeSnafu, + OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu, }; pub const ENTRY_TYPE_INDEX: usize = 0; @@ -68,7 +68,7 @@ impl SystemCatalogTable { table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, }; - let schema = Arc::new(build_system_catalog_schema()?); + let schema = Arc::new(build_system_catalog_schema()); let ctx = EngineContext::default(); if let Some(table) = engine @@ -107,12 +107,14 @@ impl SystemCatalogTable { } /// Build system catalog table schema. -/// A system catalog table consists of 4 columns, namely +/// A system catalog table consists of 6 columns, namely /// - entry_type: type of entry in current row, can be any variant of [EntryType]. /// - key: a binary encoded key of entry, differs according to different entry type. /// - timestamp: currently not used. /// - value: JSON-encoded value of entry's metadata. -fn build_system_catalog_schema() -> Result { +/// - gmt_created: create time of this metadata. +/// - gmt_modified: last updated time of this metadata. +fn build_system_catalog_schema() -> Schema { let cols = vec![ ColumnSchema::new( "entry_type".to_string(), @@ -146,7 +148,11 @@ fn build_system_catalog_schema() -> Result { ), ]; - Schema::with_timestamp_index(cols, TIMESTAMP_INDEX).context(SystemCatalogSchemaSnafu) + // The schema of this table must be valid. + SchemaBuilder::from(cols) + .timestamp_index(2) + .build() + .unwrap() } pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 1287011a71..0a38ea501a 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -193,7 +193,7 @@ mod tests { use common_recordbatch::SendableRecordBatchStream; use datatypes::{ data_type::ConcreteDataType, - schema::{ColumnSchema, Schema, SchemaRef}, + schema::{ColumnSchema, SchemaBuilder, SchemaRef}, value::Value, }; use table::error::Result as TableResult; @@ -280,7 +280,12 @@ mod tests { ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ]; - Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap()) + Arc::new( + SchemaBuilder::from(column_schemas) + .timestamp_index(3) + .build() + .unwrap(), + ) } async fn scan( &self, diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index e42252c4b8..248c36a5db 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -64,7 +64,7 @@ mod tests { use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; + use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use datatypes::value::Value; use log_store::fs::noop::NoopLogStore; use object_store::{backend::fs::Backend, ObjectStore}; @@ -96,7 +96,12 @@ mod tests { ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), ]; - Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap()) + Arc::new( + SchemaBuilder::from(column_schemas) + .timestamp_index(3) + .build() + .unwrap(), + ) } async fn scan( &self, diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index f0f39db0cd..b91429a582 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use catalog::RegisterTableRequest; use common_telemetry::tracing::info; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; use query::query_engine::Output; use snafu::{OptionExt, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName, TableConstraint}; @@ -134,7 +134,10 @@ impl SqlHandler { .collect::>>()?; let schema = Arc::new( - Schema::with_timestamp_index(columns_schemas, ts_index).context(CreateSchemaSnafu)?, + SchemaBuilder::from(columns_schemas) + .timestamp_index(ts_index) + .build() + .context(CreateSchemaSnafu)?, ); let request = CreateTableRequest { diff --git a/src/datanode/tests/http_test.rs b/src/datanode/tests/http_test.rs index 7a44b687f9..2720bdff66 100644 --- a/src/datanode/tests/http_test.rs +++ b/src/datanode/tests/http_test.rs @@ -40,7 +40,7 @@ async fn test_sql_api() { let body = res.text().await; assert_eq!( body, - r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# ); } diff --git a/src/datanode/tests/test_util.rs b/src/datanode/tests/test_util.rs index 8a86b3b40a..38d9377dc1 100644 --- a/src/datanode/tests/test_util.rs +++ b/src/datanode/tests/test_util.rs @@ -5,7 +5,7 @@ use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::Instance; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, SchemaBuilder}; use snafu::ResultExt; use table::engine::EngineContext; use table::engine::TableEngineRef; @@ -62,7 +62,9 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { table_name: table_name.to_string(), desc: Some(" a test table".to_string()), schema: Arc::new( - Schema::with_timestamp_index(column_schemas, 3) + SchemaBuilder::from(column_schemas) + .timestamp_index(3) + .build() .expect("ts is expected to be timestamp column"), ), create_if_not_exists: true, diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index be367f5bd1..3a358b43af 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -42,6 +42,17 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to parse version in schema meta, value: {}, source: {}", + value, + source + ))] + ParseSchemaVersion { + value: String, + source: std::num::ParseIntError, + backtrace: Backtrace, + }, + #[snafu(display("Invalid timestamp index: {}", index))] InvalidTimestampIndex { index: usize, backtrace: Backtrace }, } diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 91c76247d2..19f0fdfc18 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -15,3 +15,4 @@ pub mod value; pub mod vectors; pub use arrow; +pub use error::{Error, Result}; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 0b0ec7c211..1f5729464a 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,7 +1,8 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; -use arrow::datatypes::{Field, Metadata, Schema as ArrowSchema}; +pub use arrow::datatypes::Metadata; +use arrow::datatypes::{Field, Schema as ArrowSchema}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -9,9 +10,7 @@ use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error, Result}; const TIMESTAMP_INDEX_KEY: &str = "greptime:timestamp_index"; - -// TODO(yingwen): consider assign a version to schema so compare schema can be -// done by compare version. +const VERSION_KEY: &str = "greptime:version"; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ColumnSchema { @@ -34,6 +33,7 @@ impl ColumnSchema { } } +/// A common schema, should be immutable. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Schema { column_schemas: Vec, @@ -44,43 +44,27 @@ pub struct Schema { /// Timestamp key column is the column holds the timestamp and forms part of /// the primary key. None means there is no timestamp key column. timestamp_index: Option, + /// Version of the schema. + /// + /// Initial value is zero. The version should bump after altering schema. + version: u32, } impl Schema { + /// Initial version of the schema. + pub const INITIAL_VERSION: u32 = 0; + pub fn new(column_schemas: Vec) -> Schema { - let (arrow_schema, name_to_index) = collect_column_schemas(&column_schemas); - - Schema { - column_schemas, - name_to_index, - arrow_schema: Arc::new(arrow_schema), - timestamp_index: None, - } - } - - pub fn with_timestamp_index( - column_schemas: Vec, - timestamp_index: usize, - ) -> Result { - let (arrow_schema, name_to_index) = collect_column_schemas(&column_schemas); - let mut metadata = BTreeMap::new(); - metadata.insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string()); - let arrow_schema = Arc::new(arrow_schema.with_metadata(metadata)); - - validate_timestamp_index(&column_schemas, timestamp_index)?; - - Ok(Schema { - column_schemas, - name_to_index, - arrow_schema, - timestamp_index: Some(timestamp_index), - }) + // Builder won't fail + SchemaBuilder::from(column_schemas).build().unwrap() } + #[inline] pub fn arrow_schema(&self) -> &Arc { &self.arrow_schema } + #[inline] pub fn column_schemas(&self) -> &[ColumnSchema] { &self.column_schemas } @@ -106,11 +90,89 @@ impl Schema { pub fn timestamp_column(&self) -> Option<&ColumnSchema> { self.timestamp_index.map(|idx| &self.column_schemas[idx]) } + + #[inline] + pub fn version(&self) -> u32 { + self.version + } + + #[inline] + pub fn metadata(&self) -> &Metadata { + &self.arrow_schema.metadata + } } -fn collect_column_schemas( - column_schemas: &[ColumnSchema], -) -> (ArrowSchema, HashMap) { +#[derive(Default)] +pub struct SchemaBuilder { + column_schemas: Vec, + name_to_index: HashMap, + fields: Vec, + timestamp_index: Option, + version: u32, + metadata: Metadata, +} + +impl From> for SchemaBuilder { + fn from(column_schemas: Vec) -> SchemaBuilder { + SchemaBuilder::from_columns(column_schemas) + } +} + +impl SchemaBuilder { + pub fn from_columns(column_schemas: Vec) -> Self { + let (fields, name_to_index) = collect_fields(&column_schemas); + + Self { + column_schemas, + name_to_index, + fields, + ..Default::default() + } + } + + /// Set timestamp index. + /// + /// The validation of timestamp column is done in `build()`. + pub fn timestamp_index(mut self, timestamp_index: usize) -> Self { + self.timestamp_index = Some(timestamp_index); + self + } + + pub fn version(mut self, version: u32) -> Self { + self.version = version; + self + } + + /// Add key value pair to metadata. + /// + /// Old metadata with same key would be overwritten. + pub fn add_metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } + + pub fn build(mut self) -> Result { + if let Some(timestamp_index) = self.timestamp_index { + validate_timestamp_index(&self.column_schemas, timestamp_index)?; + self.metadata + .insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string()); + } + self.metadata + .insert(VERSION_KEY.to_string(), self.version.to_string()); + + let arrow_schema = ArrowSchema::from(self.fields).with_metadata(self.metadata); + + Ok(Schema { + column_schemas: self.column_schemas, + name_to_index: self.name_to_index, + arrow_schema: Arc::new(arrow_schema), + timestamp_index: self.timestamp_index, + version: self.version, + }) + } +} + +fn collect_fields(column_schemas: &[ColumnSchema]) -> (Vec, HashMap) { let mut fields = Vec::with_capacity(column_schemas.len()); let mut name_to_index = HashMap::with_capacity(column_schemas.len()); for (index, column_schema) in column_schemas.iter().enumerate() { @@ -119,7 +181,7 @@ fn collect_column_schemas( name_to_index.insert(column_schema.name.clone(), index); } - (ArrowSchema::from(fields), name_to_index) + (fields, name_to_index) } fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> { @@ -183,16 +245,28 @@ impl TryFrom> for Schema { if let Some(index) = timestamp_index { validate_timestamp_index(&column_schemas, index)?; } + let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?; Ok(Self { column_schemas, name_to_index, arrow_schema, timestamp_index, + version, }) } } +impl TryFrom for Schema { + type Error = Error; + + fn try_from(arrow_schema: ArrowSchema) -> Result { + let arrow_schema = Arc::new(arrow_schema); + + Schema::try_from(arrow_schema) + } +} + fn try_parse_index(metadata: &Metadata, key: &str) -> Result> { if let Some(value) = metadata.get(key) { let index = value @@ -205,6 +279,18 @@ fn try_parse_index(metadata: &Metadata, key: &str) -> Result> { } } +fn try_parse_version(metadata: &Metadata, key: &str) -> Result { + if let Some(value) = metadata.get(key) { + let version = value + .parse() + .context(error::ParseSchemaVersionSnafu { value })?; + + Ok(version) + } else { + Ok(Schema::INITIAL_VERSION) + } +} + #[cfg(test)] mod tests { use arrow::datatypes::DataType as ArrowDataType; @@ -223,6 +309,14 @@ mod tests { assert_eq!(column_schema, new_column_schema); } + #[test] + fn test_build_empty_schema() { + let schema = SchemaBuilder::default().build().unwrap(); + assert_eq!(0, schema.num_columns()); + + assert!(SchemaBuilder::default().timestamp_index(0).build().is_err()); + } + #[test] fn test_schema_no_timestamp() { let column_schemas = vec![ @@ -234,6 +328,7 @@ mod tests { assert_eq!(2, schema.num_columns()); assert!(schema.timestamp_index().is_none()); assert!(schema.timestamp_column().is_none()); + assert_eq!(Schema::INITIAL_VERSION, schema.version()); for column_schema in &column_schemas { let found = schema.column_schema_by_name(&column_schema.name).unwrap(); @@ -241,15 +336,25 @@ mod tests { } assert!(schema.column_schema_by_name("col3").is_none()); - let fields: Vec<_> = column_schemas.iter().map(Field::from).collect(); - let arrow_schema = Arc::new(ArrowSchema::from(fields)); - - let new_schema = Schema::try_from(arrow_schema.clone()).unwrap(); + let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap(); assert_eq!(schema, new_schema); assert_eq!(column_schemas, schema.column_schemas()); - assert_eq!(arrow_schema, *schema.arrow_schema()); - assert_eq!(arrow_schema, *new_schema.arrow_schema()); + } + + #[test] + fn test_metadata() { + let column_schemas = vec![ColumnSchema::new( + "col1", + ConcreteDataType::int32_datatype(), + false, + )]; + let schema = SchemaBuilder::from(column_schemas) + .add_metadata("k1", "v1") + .build() + .unwrap(); + + assert_eq!("v1", schema.metadata().get("k1").unwrap()); } #[test] @@ -258,10 +363,15 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), ]; - let schema = Schema::with_timestamp_index(column_schemas.clone(), 1).unwrap(); + let schema = SchemaBuilder::from(column_schemas.clone()) + .timestamp_index(1) + .version(123) + .build() + .unwrap(); assert_eq!(1, schema.timestamp_index().unwrap()); assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap()); + assert_eq!(123, schema.version()); let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap(); assert_eq!(1, schema.timestamp_index().unwrap()); @@ -274,8 +384,17 @@ mod tests { ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false), ]; - assert!(Schema::with_timestamp_index(column_schemas.clone(), 0).is_err()); - assert!(Schema::with_timestamp_index(column_schemas.clone(), 1).is_err()); - assert!(Schema::with_timestamp_index(column_schemas, 2).is_err()); + assert!(SchemaBuilder::from(column_schemas.clone()) + .timestamp_index(0) + .build() + .is_err()); + assert!(SchemaBuilder::from(column_schemas.clone()) + .timestamp_index(1) + .build() + .is_err()); + assert!(SchemaBuilder::from(column_schemas) + .timestamp_index(1) + .build() + .is_err()); } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 319f98ed24..d40e6cd72b 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -3,7 +3,7 @@ use std::cmp::Ordering; use common_base::bytes::{Bytes, StringBytes}; use datafusion_common::ScalarValue; pub use ordered_float::OrderedFloat; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use crate::prelude::*; @@ -15,7 +15,7 @@ pub type OrderedF64 = OrderedFloat; /// Although compare Value with different data type is allowed, it is recommended to only /// compare Value with same data type. Comparing Value with different data type may not /// behaves as what you expect. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub enum Value { Null, @@ -132,30 +132,31 @@ impl From<&[u8]> for Value { } } -impl Serialize for Value { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self { - Value::Null => serde_json::Value::Null.serialize(serializer), - Value::Boolean(v) => v.serialize(serializer), - Value::UInt8(v) => v.serialize(serializer), - Value::UInt16(v) => v.serialize(serializer), - Value::UInt32(v) => v.serialize(serializer), - Value::UInt64(v) => v.serialize(serializer), - Value::Int8(v) => v.serialize(serializer), - Value::Int16(v) => v.serialize(serializer), - Value::Int32(v) => v.serialize(serializer), - Value::Int64(v) => v.serialize(serializer), - Value::Float32(v) => v.serialize(serializer), - Value::Float64(v) => v.serialize(serializer), - Value::String(bytes) => bytes.serialize(serializer), - Value::Binary(bytes) => bytes.serialize(serializer), - Value::Date(v) => v.serialize(serializer), - Value::DateTime(v) => v.serialize(serializer), - Value::List(_) => unimplemented!(), - } +impl TryFrom for serde_json::Value { + type Error = serde_json::Error; + + fn try_from(value: Value) -> serde_json::Result { + let json_value = match value { + Value::Null => serde_json::Value::Null, + Value::Boolean(v) => serde_json::Value::Bool(v), + Value::UInt8(v) => serde_json::Value::from(v), + Value::UInt16(v) => serde_json::Value::from(v), + Value::UInt32(v) => serde_json::Value::from(v), + Value::UInt64(v) => serde_json::Value::from(v), + Value::Int8(v) => serde_json::Value::from(v), + Value::Int16(v) => serde_json::Value::from(v), + Value::Int32(v) => serde_json::Value::from(v), + Value::Int64(v) => serde_json::Value::from(v), + Value::Float32(v) => serde_json::Value::from(v.0), + Value::Float64(v) => serde_json::Value::from(v.0), + Value::String(bytes) => serde_json::Value::String(bytes.as_utf8().to_string()), + Value::Binary(bytes) => serde_json::to_value(bytes)?, + Value::Date(v) => serde_json::Value::Number(v.into()), + Value::DateTime(v) => serde_json::Value::Number(v.into()), + Value::List(v) => serde_json::to_value(v)?, + }; + + Ok(json_value) } } @@ -437,4 +438,80 @@ mod tests { assert_eq!(ScalarValue::Boolean(None), Value::Null.into()); } + + fn to_json(value: Value) -> serde_json::Value { + value.try_into().unwrap() + } + + #[test] + fn test_to_json_value() { + assert_eq!(serde_json::Value::Null, to_json(Value::Null)); + assert_eq!(serde_json::Value::Bool(true), to_json(Value::Boolean(true))); + assert_eq!( + serde_json::Value::Number(20u8.into()), + to_json(Value::UInt8(20)) + ); + assert_eq!( + serde_json::Value::Number(20i8.into()), + to_json(Value::Int8(20)) + ); + assert_eq!( + serde_json::Value::Number(2000u16.into()), + to_json(Value::UInt16(2000)) + ); + assert_eq!( + serde_json::Value::Number(2000i16.into()), + to_json(Value::Int16(2000)) + ); + assert_eq!( + serde_json::Value::Number(3000u32.into()), + to_json(Value::UInt32(3000)) + ); + assert_eq!( + serde_json::Value::Number(3000i32.into()), + to_json(Value::Int32(3000)) + ); + assert_eq!( + serde_json::Value::Number(4000u64.into()), + to_json(Value::UInt64(4000)) + ); + assert_eq!( + serde_json::Value::Number(4000i64.into()), + to_json(Value::Int64(4000)) + ); + assert_eq!( + serde_json::Value::from(125.0f32), + to_json(Value::Float32(125.0.into())) + ); + assert_eq!( + serde_json::Value::from(125.0f64), + to_json(Value::Float64(125.0.into())) + ); + assert_eq!( + serde_json::Value::String(String::from("hello")), + to_json(Value::String(StringBytes::from("hello"))) + ); + assert_eq!( + serde_json::Value::from(b"world".as_slice()), + to_json(Value::Binary(Bytes::from(b"world".as_slice()))) + ); + assert_eq!( + serde_json::Value::Number(5000i32.into()), + to_json(Value::Date(5000)) + ); + assert_eq!( + serde_json::Value::Number(5000i64.into()), + to_json(Value::DateTime(5000)) + ); + + let json_value: serde_json::Value = + serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap(); + assert_eq!( + json_value, + to_json(Value::List(ListValue { + items: Some(Box::new(vec![Value::Int32(123)])), + datatype: ConcreteDataType::int32_datatype(), + })) + ); + } } diff --git a/src/datatypes/src/vectors/constant.rs b/src/datatypes/src/vectors/constant.rs index 62e0dcafb8..5841fb6213 100644 --- a/src/datatypes/src/vectors/constant.rs +++ b/src/datatypes/src/vectors/constant.rs @@ -114,7 +114,7 @@ impl Serializable for ConstantVector { fn serialize_to_json(&self) -> Result> { std::iter::repeat(self.try_get(0)?) .take(self.len()) - .map(serde_json::to_value) + .map(serde_json::Value::try_from) .collect::>() .context(SerializeSnafu) } diff --git a/src/storage/benches/memtable/util/mod.rs b/src/storage/benches/memtable/util/mod.rs index a5cdba93f1..80b7a4c481 100644 --- a/src/storage/benches/memtable/util/mod.rs +++ b/src/storage/benches/memtable/util/mod.rs @@ -4,21 +4,23 @@ pub mod schema_util; use datatypes::type_id::LogicalTypeId; use storage::{ - memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef, MemtableSchema}, + memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef}, metadata::RegionMetadata, + schema::RegionSchemaRef, }; use crate::memtable::util::regiondesc_util::RegionDescBuilder; pub const TIMESTAMP_NAME: &str = "timestamp"; -pub fn schema_for_test() -> MemtableSchema { +pub fn schema_for_test() -> RegionSchemaRef { let desc = RegionDescBuilder::new("bench") .push_value_column(("v1", LogicalTypeId::UInt64, true)) .push_value_column(("v2", LogicalTypeId::String, true)) .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - MemtableSchema::new(metadata.columns_row_key) + + metadata.schema().clone() } pub fn new_memtable() -> MemtableRef { diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index a445053574..772e2a4cc0 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -175,14 +175,10 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Parquet file schema is not valid: {}", msg))] - SequenceColumnNotFound { msg: String, backtrace: Backtrace }, - - #[snafu(display("Parquet file schema is not valid, msg: {}, source: {}", msg, source))] + #[snafu(display("Parquet file schema is invalid, source: {}", source))] InvalidParquetSchema { - msg: String, #[snafu(backtrace)] - source: datatypes::error::Error, + source: crate::schema::Error, }, #[snafu(display("Region is under {} state, cannot proceed operation", state))] @@ -221,6 +217,20 @@ pub enum Error { given: SequenceNumber, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert sst schema, file: {}, source: {}", file, source))] + ConvertSstSchema { + file: String, + #[snafu(backtrace)] + source: crate::schema::Error, + }, + + #[snafu(display("Invalid raw region metadata, region: {}, source: {}", region, source))] + InvalidRawRegion { + region: String, + #[snafu(backtrace)] + source: MetadataError, + }, } pub type Result = std::result::Result; @@ -245,10 +255,11 @@ impl ErrorExt for Error { | DecodeMetaActionList { .. } | Readline { .. } | InvalidParquetSchema { .. } - | SequenceColumnNotFound { .. } | WalDataCorrupted { .. } | VersionNotFound { .. } - | SequenceNotMonotonic { .. } => StatusCode::Unexpected, + | SequenceNotMonotonic { .. } + | ConvertSstSchema { .. } + | InvalidRawRegion { .. } => StatusCode::Unexpected, FlushIo { .. } | WriteParquet { .. } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index cc21e85f2e..059f21ce66 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -199,7 +199,7 @@ impl FlushJob { async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result { let edit = RegionEdit { - region_version: self.shared.version_control.metadata().version, + region_version: self.shared.version_control.metadata().version(), flushed_sequence: self.flush_sequence, files_to_add: file_metas.to_vec(), files_to_remove: Vec::default(), diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 8b31f52122..627eb37c17 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -14,6 +14,7 @@ pub mod metadata; mod proto; mod read; mod region; +pub mod schema; mod snapshot; mod sst; mod sync; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 6b8fd3de69..beb8479802 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -14,12 +14,38 @@ use crate::error::{ ReadlineSnafu, Result, }; use crate::manifest::helper; -use crate::metadata::{RegionMetadataRef, VersionNumber}; +use crate::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber}; use crate::sst::FileMeta; +/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawRegionMetadata { + pub id: RegionId, + pub name: String, + pub columns: RawColumnsMetadata, + pub column_families: RawColumnFamiliesMetadata, + pub version: VersionNumber, +} + +/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawColumnsMetadata { + pub columns: Vec, + pub row_key_end: usize, + pub timestamp_key_index: usize, + pub enable_version_column: bool, + pub user_column_end: usize, +} + +/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawColumnFamiliesMetadata { + pub column_families: Vec, +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionChange { - pub metadata: RegionMetadataRef, + pub metadata: RawRegionMetadata, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -35,12 +61,6 @@ pub struct RegionEdit { pub files_to_remove: Vec, } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct RegionMetaActionList { - pub actions: Vec, - pub prev_version: ManifestVersion, -} - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum RegionMetaAction { Protocol(ProtocolAction), @@ -49,6 +69,12 @@ pub enum RegionMetaAction { Edit(RegionEdit), } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct RegionMetaActionList { + pub actions: Vec, + pub prev_version: ManifestVersion, +} + impl RegionMetaActionList { pub fn with_action(action: RegionMetaAction) -> Self { Self { diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 92aea2980e..633262977f 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -14,6 +14,7 @@ mod tests { use super::*; use crate::manifest::test_utils::*; + use crate::metadata::RegionMetadata; #[tokio::test] async fn test_region_manifest() { @@ -43,7 +44,7 @@ mod tests { manifest .update(RegionMetaActionList::with_action(RegionMetaAction::Change( RegionChange { - metadata: region_meta.clone(), + metadata: (&*region_meta).into(), }, ))) .await @@ -58,7 +59,10 @@ mod tests { match action { RegionMetaAction::Change(c) => { - assert_eq!(c.metadata, region_meta); + assert_eq!( + RegionMetadata::try_from(c.metadata.clone()).unwrap(), + *region_meta + ); } _ => unreachable!(), } @@ -79,7 +83,10 @@ mod tests { let action = &action_list.actions[0]; match action { RegionMetaAction::Change(c) => { - assert_eq!(c.metadata, region_meta); + assert_eq!( + RegionMetadata::try_from(c.metadata.clone()).unwrap(), + *region_meta + ); } _ => unreachable!(), } diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index d8aac2523e..0c3adaf9db 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -1,6 +1,5 @@ mod btree; mod inserter; -mod schema; #[cfg(test)] pub mod tests; mod version; @@ -13,9 +12,9 @@ use store_api::storage::{consts, SequenceNumber, ValueType}; use crate::error::Result; use crate::memtable::btree::BTreeMemtable; pub use crate::memtable::inserter::Inserter; -pub use crate::memtable::schema::MemtableSchema; pub use crate::memtable::version::{MemtableSet, MemtableVersion}; use crate::read::Batch; +use crate::schema::RegionSchemaRef; /// Unique id for memtables under same region. pub type MemtableId = u32; @@ -24,7 +23,7 @@ pub type MemtableId = u32; pub trait Memtable: Send + Sync + std::fmt::Debug { fn id(&self) -> MemtableId; - fn schema(&self) -> &MemtableSchema; + fn schema(&self) -> RegionSchemaRef; /// Write key/values to the memtable. /// @@ -83,7 +82,7 @@ pub enum RowOrdering { /// as an async trait. pub trait BatchIterator: Iterator> + Send + Sync { /// Returns the schema of this iterator. - fn schema(&self) -> &MemtableSchema; + fn schema(&self) -> RegionSchemaRef; /// Returns the ordering of the output rows from this iterator. fn ordering(&self) -> RowOrdering; @@ -92,7 +91,7 @@ pub trait BatchIterator: Iterator> + Send + Sync { pub type BoxedBatchIterator = Box; pub trait MemtableBuilder: Send + Sync + std::fmt::Debug { - fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef; + fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef; } pub type MemtableBuilderRef = Arc; @@ -136,7 +135,7 @@ impl KeyValues { pub struct DefaultMemtableBuilder; impl MemtableBuilder for DefaultMemtableBuilder { - fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef { + fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef { Arc::new(BTreeMemtable::new(id, schema)) } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 24911045c6..08ed6c7908 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -15,10 +15,10 @@ use store_api::storage::{SequenceNumber, ValueType}; use crate::error::Result; use crate::memtable::{ - BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, - MemtableSchema, RowOrdering, + BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering, }; use crate::read::Batch; +use crate::schema::RegionSchemaRef; type RwLockMap = RwLock>; @@ -28,13 +28,13 @@ type RwLockMap = RwLock>; #[derive(Debug)] pub struct BTreeMemtable { id: MemtableId, - schema: MemtableSchema, + schema: RegionSchemaRef, map: Arc, estimated_bytes: AtomicUsize, } impl BTreeMemtable { - pub fn new(id: MemtableId, schema: MemtableSchema) -> BTreeMemtable { + pub fn new(id: MemtableId, schema: RegionSchemaRef) -> BTreeMemtable { BTreeMemtable { id, schema, @@ -49,8 +49,8 @@ impl Memtable for BTreeMemtable { self.id } - fn schema(&self) -> &MemtableSchema { - &self.schema + fn schema(&self) -> RegionSchemaRef { + self.schema.clone() } fn write(&self, kvs: &KeyValues) -> Result<()> { @@ -81,14 +81,14 @@ impl Memtable for BTreeMemtable { struct BTreeIterator { ctx: IterContext, - schema: MemtableSchema, + schema: RegionSchemaRef, map: Arc, last_key: Option, } impl BatchIterator for BTreeIterator { - fn schema(&self) -> &MemtableSchema { - &self.schema + fn schema(&self) -> RegionSchemaRef { + self.schema.clone() } fn ordering(&self) -> RowOrdering { @@ -105,7 +105,7 @@ impl Iterator for BTreeIterator { } impl BTreeIterator { - fn new(ctx: IterContext, schema: MemtableSchema, map: Arc) -> BTreeIterator { + fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc) -> BTreeIterator { BTreeIterator { ctx, schema, diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index a54680615d..85a4fea6a4 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -293,10 +293,9 @@ mod tests { use store_api::storage::{PutOperation, WriteRequest}; use super::*; - use crate::memtable::{ - DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId, MemtableSchema, - }; + use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId}; use crate::metadata::RegionMetadata; + use crate::schema::RegionSchemaRef; use crate::test_util::descriptor_util::RegionDescBuilder; use crate::test_util::write_batch_util; @@ -558,7 +557,7 @@ mod tests { ) } - fn new_memtable_schema() -> MemtableSchema { + fn new_region_schema() -> RegionSchemaRef { let desc = RegionDescBuilder::new("test") .timestamp(("ts", LogicalTypeId::Int64, false)) .push_value_column(("value", LogicalTypeId::Int64, true)) @@ -566,7 +565,7 @@ mod tests { .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - MemtableSchema::new(metadata.columns_row_key) + metadata.schema().clone() } fn put_batch(batch: &mut WriteBatch, data: &[(i64, Option)]) { @@ -579,7 +578,7 @@ mod tests { batch.put(put_data).unwrap(); } - fn new_memtable_set(time_ranges: &[RangeMillis], schema: &MemtableSchema) -> MemtableSet { + fn new_memtable_set(time_ranges: &[RangeMillis], schema: &RegionSchemaRef) -> MemtableSet { let mut set = MemtableSet::new(); for (id, range) in time_ranges.iter().enumerate() { let mem = DefaultMemtableBuilder {}.build(id as MemtableId, schema.clone()); @@ -619,7 +618,7 @@ mod tests { let sequence = 11111; let bucket_duration = 100; let time_ranges = new_time_ranges(&[0], bucket_duration); - let memtable_schema = new_memtable_schema(); + let memtable_schema = new_region_schema(); let memtables = new_memtable_set(&time_ranges, &memtable_schema); let mut inserter = Inserter::new( sequence, @@ -657,7 +656,7 @@ mod tests { let sequence = 11111; let bucket_duration = 100; let time_ranges = new_time_ranges(&[0, 100, 200], bucket_duration); - let memtable_schema = new_memtable_schema(); + let memtable_schema = new_region_schema(); let memtables = new_memtable_set(&time_ranges, &memtable_schema); let mut inserter = Inserter::new( sequence, diff --git a/src/storage/src/memtable/schema.rs b/src/storage/src/memtable/schema.rs deleted file mode 100644 index e23c210de0..0000000000 --- a/src/storage/src/memtable/schema.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef}; - -#[derive(Clone, Debug, PartialEq)] -pub struct MemtableSchema { - columns_row_key: ColumnsRowKeyMetadataRef, -} - -impl MemtableSchema { - pub fn new(columns_row_key: ColumnsRowKeyMetadataRef) -> MemtableSchema { - MemtableSchema { columns_row_key } - } - - #[inline] - pub fn row_key_columns(&self) -> impl Iterator { - self.columns_row_key.iter_row_key_columns() - } - - #[inline] - pub fn value_columns(&self) -> impl Iterator { - self.columns_row_key.iter_value_columns() - } - - #[inline] - pub fn num_row_key_columns(&self) -> usize { - self.columns_row_key.num_row_key_columns() - } - - #[inline] - pub fn num_value_columns(&self) -> usize { - self.columns_row_key.num_value_columns() - } -} diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 6f31213193..0cf53bda5c 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -4,6 +4,7 @@ use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder}; use super::*; use crate::metadata::RegionMetadata; +use crate::schema::RegionSchemaRef; use crate::test_util::descriptor_util::RegionDescBuilder; // For simplicity, all memtables in test share same memtable id. @@ -12,15 +13,15 @@ const MEMTABLE_ID: MemtableId = 1; // Schema for testing memtable: // - key: Int64(timestamp), UInt64(version), // - value: UInt64 -pub fn schema_for_test() -> MemtableSchema { - // Just build a region desc and use its columns_row_key metadata. +pub fn schema_for_test() -> RegionSchemaRef { + // Just build a region desc and use its columns metadata. let desc = RegionDescBuilder::new("test") .enable_version_column(true) .push_value_column(("v1", LogicalTypeId::UInt64, true)) .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); - MemtableSchema::new(metadata.columns_row_key) + metadata.schema().clone() } fn kvs_for_test_with_index( @@ -128,10 +129,8 @@ fn check_iter_content( assert_eq!(keys.len(), index); } -// TODO(yingwen): Check size of the returned batch. - struct MemtableTester { - schema: MemtableSchema, + schema: RegionSchemaRef, builders: Vec, } @@ -172,7 +171,7 @@ impl MemtableTester { } struct TestContext { - schema: MemtableSchema, + schema: RegionSchemaRef, memtable: MemtableRef, } @@ -216,7 +215,7 @@ fn write_iter_memtable_case(ctx: &TestContext) { ..Default::default() }; let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); - assert_eq!(ctx.schema, *iter.schema()); + assert_eq!(ctx.schema, iter.schema()); assert_eq!(RowOrdering::Key, iter.ordering()); check_iter_content( diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index b9df711905..8e84d5eb9a 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -4,30 +4,42 @@ use std::sync::Arc; use common_error::prelude::*; use datatypes::data_type::ConcreteDataType; use serde::{Deserialize, Serialize}; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use store_api::storage::{ - consts, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, - ColumnId, ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, - SchemaRef, + consts::{self, ReservedColumnId}, + ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, ColumnId, + ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, SchemaRef, }; +use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata}; +use crate::schema::{RegionSchema, RegionSchemaRef}; + /// Error for handling metadata. #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Column name already exists, name: {}", name))] + #[snafu(display("Column name {} already exists", name))] ColNameExists { name: String, backtrace: Backtrace }, - #[snafu(display("Column family name already exists, name: {}", name))] + #[snafu(display("Column family name {} already exists", name))] CfNameExists { name: String, backtrace: Backtrace }, - #[snafu(display("Column family id already exists, id: {}", id))] + #[snafu(display("Column family id {} already exists", id))] CfIdExists { id: ColumnId, backtrace: Backtrace }, + #[snafu(display("Column id {} already exists", id))] + ColIdExists { id: ColumnId, backtrace: Backtrace }, + #[snafu(display("Failed to build schema, source: {}", source))] InvalidSchema { - source: datatypes::error::Error, - backtrace: Backtrace, + #[snafu(backtrace)] + source: crate::schema::Error, }, + + #[snafu(display("Column name {} is reserved by the system", name))] + ReservedColumn { name: String, backtrace: Backtrace }, + + #[snafu(display("Missing timestamp key column"))] + MissingTimestamp { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -48,34 +60,27 @@ impl RegionMetaImpl { impl RegionMeta for RegionMetaImpl { fn schema(&self) -> &SchemaRef { - &self.metadata.schema + self.metadata.user_schema() } } pub type VersionNumber = u32; -// TODO(yingwen): Make some fields of metadata private. +// TODO(yingwen): We may need to hold a list of history schema. /// In memory metadata of region. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct RegionMetadata { // The following fields are immutable. id: RegionId, name: String, // The following fields are mutable. - /// Schema of the region. - /// - /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` - /// conveniently. The fields order in `SchemaRef` **must** be consistent with - /// columns order in [ColumnsMetadata] to ensure the projection index of a field - /// is correct. - pub schema: SchemaRef, - pub columns_row_key: ColumnsRowKeyMetadataRef, - pub column_families: ColumnFamiliesMetadata, - /// Version of the metadata. Version is set to zero initially and bumped once the - /// metadata have been altered. - pub version: VersionNumber, + /// Latest schema of the region. + schema: RegionSchemaRef, + pub columns: ColumnsMetadataRef, + column_families: ColumnFamiliesMetadata, + version: VersionNumber, } impl RegionMetadata { @@ -88,66 +93,171 @@ impl RegionMetadata { pub fn name(&self) -> &str { &self.name } + + #[inline] + pub fn schema(&self) -> &RegionSchemaRef { + &self.schema + } + + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + self.schema.user_schema() + } + + #[inline] + pub fn version(&self) -> u32 { + self.schema.version() + } } pub type RegionMetadataRef = Arc; +impl From<&RegionMetadata> for RawRegionMetadata { + fn from(data: &RegionMetadata) -> RawRegionMetadata { + RawRegionMetadata { + id: data.id, + name: data.name.clone(), + columns: (&*data.columns).into(), + column_families: (&data.column_families).into(), + version: data.version, + } + } +} + +impl TryFrom for RegionMetadata { + type Error = Error; + + fn try_from(raw: RawRegionMetadata) -> Result { + let columns = Arc::new(ColumnsMetadata::from(raw.columns)); + let schema = + Arc::new(RegionSchema::new(columns.clone(), raw.version).context(InvalidSchemaSnafu)?); + + Ok(RegionMetadata { + id: raw.id, + name: raw.name, + schema, + columns, + column_families: raw.column_families.into(), + version: raw.version, + }) + } +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ColumnMetadata { pub cf_id: ColumnFamilyId, pub desc: ColumnDescriptor, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq)] pub struct ColumnsMetadata { - /// All columns, in `(key columns, timestamp, [version,] value columns)` order. + /// All columns. /// - /// Columns order should be consistent with fields order in [SchemaRef]. - pub columns: Vec, + /// Columns are organized in the following order: + /// ```text + /// key columns, timestamp, [version,] value columns, internal columns + /// ``` + /// + /// The key columns, timestamp and version forms the row key. + columns: Vec, /// Maps column name to index of columns, used to fast lookup column by name. - pub name_to_col_index: HashMap, -} - -#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] -pub struct RowKeyMetadata { + name_to_col_index: HashMap, /// Exclusive end index of row key columns. row_key_end: usize, /// Index of timestamp key column. - pub timestamp_key_index: usize, + timestamp_key_index: usize, /// If version column is enabled, then the last column of key columns is a /// version column. - pub enable_version_column: bool, + enable_version_column: bool, + /// Exclusive end index of user columns. + /// + /// Columns in `[user_column_end..)` are internal columns. + user_column_end: usize, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct ColumnsRowKeyMetadata { - columns: ColumnsMetadata, - row_key: RowKeyMetadata, -} - -impl ColumnsRowKeyMetadata { +impl ColumnsMetadata { + /// Returns an iterator to all row key columns. + /// + /// Row key columns includes all key columns, the timestamp column and the + /// optional version column. pub fn iter_row_key_columns(&self) -> impl Iterator { - self.columns.columns.iter().take(self.row_key.row_key_end) + self.columns.iter().take(self.row_key_end) } + /// Returns an iterator to all value columns (internal columns are excluded). pub fn iter_value_columns(&self) -> impl Iterator { - self.columns.columns.iter().skip(self.row_key.row_key_end) + self.columns[self.row_key_end..self.user_column_end].iter() + } + + pub fn iter_user_columns(&self) -> impl Iterator { + self.columns.iter().take(self.user_column_end) + } + + pub fn iter_all_columns(&self) -> impl Iterator { + self.columns.iter() } #[inline] pub fn num_row_key_columns(&self) -> usize { - self.row_key.row_key_end + self.row_key_end } #[inline] pub fn num_value_columns(&self) -> usize { - self.columns.columns.len() - self.row_key.row_key_end + self.user_column_end - self.row_key_end + } + + #[inline] + pub fn timestamp_key_index(&self) -> usize { + self.timestamp_key_index + } + + #[inline] + pub fn row_key_end(&self) -> usize { + self.row_key_end + } + + #[inline] + pub fn user_column_end(&self) -> usize { + self.user_column_end } } -pub type ColumnsRowKeyMetadataRef = Arc; +pub type ColumnsMetadataRef = Arc; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +impl From<&ColumnsMetadata> for RawColumnsMetadata { + fn from(data: &ColumnsMetadata) -> RawColumnsMetadata { + RawColumnsMetadata { + columns: data.columns.clone(), + row_key_end: data.row_key_end, + timestamp_key_index: data.timestamp_key_index, + enable_version_column: data.enable_version_column, + user_column_end: data.user_column_end, + } + } +} + +impl From for ColumnsMetadata { + fn from(raw: RawColumnsMetadata) -> ColumnsMetadata { + let name_to_col_index = raw + .columns + .iter() + .enumerate() + .map(|(i, col)| (col.desc.name.clone(), i)) + .collect(); + + ColumnsMetadata { + columns: raw.columns, + name_to_col_index, + row_key_end: raw.row_key_end, + timestamp_key_index: raw.timestamp_key_index, + enable_version_column: raw.enable_version_column, + user_column_end: raw.user_column_end, + } + } +} + +#[derive(Clone, Debug, PartialEq)] pub struct ColumnFamiliesMetadata { /// Map column family id to column family metadata. id_to_cfs: HashMap, @@ -159,6 +269,24 @@ impl ColumnFamiliesMetadata { } } +impl From<&ColumnFamiliesMetadata> for RawColumnFamiliesMetadata { + fn from(data: &ColumnFamiliesMetadata) -> RawColumnFamiliesMetadata { + let column_families = data.id_to_cfs.values().cloned().collect(); + RawColumnFamiliesMetadata { column_families } + } +} + +impl From for ColumnFamiliesMetadata { + fn from(raw: RawColumnFamiliesMetadata) -> ColumnFamiliesMetadata { + let id_to_cfs = raw + .column_families + .into_iter() + .map(|cf| (cf.cf_id, cf)) + .collect(); + ColumnFamiliesMetadata { id_to_cfs } + } +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct ColumnFamilyMetadata { /// Column family name. @@ -189,6 +317,7 @@ impl TryFrom for RegionMetadata { } } +// TODO(yingwen): Add a builder to build ColumnMetadata and refactor this builder. #[derive(Default)] struct RegionMetadataBuilder { id: RegionId, @@ -196,8 +325,13 @@ struct RegionMetadataBuilder { columns: Vec, column_schemas: Vec, name_to_col_index: HashMap, + /// Column id set, used to validate column id uniqueness. + column_ids: HashSet, - row_key: RowKeyMetadata, + // Row key metadata: + row_key_end: usize, + timestamp_key_index: Option, + enable_version_column: bool, id_to_cfs: HashMap, cf_names: HashSet, @@ -224,7 +358,7 @@ impl RegionMetadataBuilder { } // TODO(yingwen): Validate this is a timestamp column. - let timestamp_key_index = self.columns.len(); + self.timestamp_key_index = Some(self.columns.len()); self.push_row_key_column(key.timestamp)?; if key.enable_version_column { @@ -233,13 +367,8 @@ impl RegionMetadataBuilder { self.push_row_key_column(version_col)?; } - let row_key_end = self.columns.len(); - - self.row_key = RowKeyMetadata { - row_key_end, - timestamp_key_index, - enable_version_column: key.enable_version_column, - }; + self.row_key_end = self.columns.len(); + self.enable_version_column = key.enable_version_column; Ok(self) } @@ -274,29 +403,33 @@ impl RegionMetadataBuilder { Ok(self) } - fn build(self) -> Result { - let schema = if self.column_schemas.is_empty() { - Arc::new(Schema::new(self.column_schemas)) - } else { - Arc::new( - Schema::with_timestamp_index(self.column_schemas, self.row_key.timestamp_key_index) - .context(InvalidSchemaSnafu)?, - ) - }; - let columns = ColumnsMetadata { + fn build(mut self) -> Result { + let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?; + + let user_column_end = self.columns.len(); + // Setup internal columns. + for internal_desc in internal_column_descs() { + self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?; + } + + let columns = Arc::new(ColumnsMetadata { columns: self.columns, name_to_col_index: self.name_to_col_index, - }; - let columns_row_key = Arc::new(ColumnsRowKeyMetadata { - columns, - row_key: self.row_key, + row_key_end: self.row_key_end, + timestamp_key_index, + enable_version_column: self.enable_version_column, + user_column_end, }); + let schema = Arc::new( + RegionSchema::new(columns.clone(), Schema::INITIAL_VERSION) + .context(InvalidSchemaSnafu)?, + ); Ok(RegionMetadata { id: self.id, name: self.name, schema, - columns_row_key, + columns, column_families: ColumnFamiliesMetadata { id_to_cfs: self.id_to_cfs, }, @@ -311,22 +444,35 @@ impl RegionMetadataBuilder { } fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { + ensure!( + !is_internal_value_column(&desc.name), + ReservedColumnSnafu { name: &desc.name } + ); + + self.push_new_column(cf_id, desc) + } + + fn push_new_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { ensure!( !self.name_to_col_index.contains_key(&desc.name), ColNameExistsSnafu { name: &desc.name } ); + ensure!( + !self.column_ids.contains(&desc.id), + ColIdExistsSnafu { id: desc.id } + ); let column_schema = ColumnSchema::from(&desc); let column_name = desc.name.clone(); + let column_id = desc.id; let meta = ColumnMetadata { cf_id, desc }; - // TODO(yingwen): Store cf_id to metadata in field. - let column_index = self.columns.len(); self.columns.push(meta); self.column_schemas.push(column_schema); self.name_to_col_index.insert(column_name, column_index); + self.column_ids.insert(column_id); Ok(()) } @@ -334,7 +480,7 @@ impl RegionMetadataBuilder { fn version_column_desc() -> ColumnDescriptor { ColumnDescriptorBuilder::new( - consts::VERSION_COLUMN_ID, + ReservedColumnId::version(), consts::VERSION_COLUMN_NAME.to_string(), ConcreteDataType::uint64_datatype(), ) @@ -343,6 +489,36 @@ fn version_column_desc() -> ColumnDescriptor { .unwrap() } +fn internal_column_descs() -> [ColumnDescriptor; 2] { + [ + ColumnDescriptorBuilder::new( + ReservedColumnId::sequence(), + consts::SEQUENCE_COLUMN_NAME.to_string(), + ConcreteDataType::uint64_datatype(), + ) + .is_nullable(false) + .build() + .unwrap(), + ColumnDescriptorBuilder::new( + ReservedColumnId::value_type(), + consts::VALUE_TYPE_COLUMN_NAME.to_string(), + ConcreteDataType::uint8_datatype(), + ) + .is_nullable(false) + .build() + .unwrap(), + ] +} + +/// Returns true if this is an internal column for value column. +#[inline] +fn is_internal_value_column(column_name: &str) -> bool { + matches!( + column_name, + consts::SEQUENCE_COLUMN_NAME | consts::VALUE_TYPE_COLUMN_NAME + ) +} + // TODO(yingwen): Add tests for using invalid row_key/cf to build metadata. #[cfg(test)] mod tests { @@ -378,33 +554,95 @@ mod tests { let metadata = RegionMetadata::try_from(desc).unwrap(); assert_eq!(region_name, metadata.name); - assert_eq!(expect_schema, metadata.schema); - assert_eq!(2, metadata.columns_row_key.num_row_key_columns()); - assert_eq!(1, metadata.columns_row_key.num_value_columns()); + assert_eq!(expect_schema, *metadata.user_schema()); + assert_eq!(2, metadata.columns.num_row_key_columns()); + assert_eq!(1, metadata.columns.num_value_columns()); } #[test] fn test_build_empty_region_metadata() { - let metadata = RegionMetadataBuilder::default().build().unwrap(); - assert!(metadata.schema.column_schemas().is_empty()); + let err = RegionMetadataBuilder::default().build().err().unwrap(); + assert!(matches!(err, Error::MissingTimestamp { .. })); + } - assert!(metadata.columns_row_key.columns.columns.is_empty()); - assert_eq!(0, metadata.columns_row_key.num_row_key_columns()); - assert!(metadata - .columns_row_key - .iter_row_key_columns() - .next() - .is_none()); - assert_eq!(0, metadata.columns_row_key.num_value_columns()); - assert!(metadata - .columns_row_key - .iter_value_columns() - .next() - .is_none()); + #[test] + fn test_build_metadata_duplicate_name() { + let cf = ColumnFamilyDescriptorBuilder::default() + .push_column( + ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .push_column( + ColumnDescriptorBuilder::new(5, "v1", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new() + .add_column_family(cf) + .err() + .unwrap(); + assert!(matches!(err, Error::ColNameExists { .. })); + } - assert!(metadata.column_families.id_to_cfs.is_empty()); + #[test] + fn test_build_metadata_internal_name() { + let names = [consts::SEQUENCE_COLUMN_NAME, consts::VALUE_TYPE_COLUMN_NAME]; + for name in names { + let cf = ColumnFamilyDescriptorBuilder::default() + .push_column( + ColumnDescriptorBuilder::new(5, name, ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new() + .add_column_family(cf) + .err() + .unwrap(); + assert!(matches!(err, Error::ReservedColumn { .. })); + } + } - assert_eq!(0, metadata.version); + #[test] + fn test_build_metadata_duplicate_id() { + let cf = ColumnFamilyDescriptorBuilder::default() + .push_column( + ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .push_column( + ColumnDescriptorBuilder::new(4, "v2", ConcreteDataType::int64_datatype()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new() + .add_column_family(cf) + .err() + .unwrap(); + assert!(matches!(err, Error::ColIdExists { .. })); + + let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build() + .unwrap(); + let row_key = RowKeyDescriptorBuilder::new(timestamp) + .push_column( + ColumnDescriptorBuilder::new(2, "k1", ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build() + .unwrap(), + ) + .build() + .unwrap(); + let err = RegionMetadataBuilder::new().row_key(row_key).err().unwrap(); + assert!(matches!(err, Error::ColIdExists { .. })); } fn new_metadata(enable_version_column: bool) -> RegionMetadata { @@ -454,30 +692,30 @@ mod tests { Some(1), ); - assert_eq!(expect_schema, metadata.schema); + assert_eq!(expect_schema, *metadata.user_schema()); - // 3 columns - assert_eq!(3, metadata.columns_row_key.columns.columns.len()); + // 3 user columns and 2 internal columns + assert_eq!(5, metadata.columns.columns.len()); // 2 row key columns - assert_eq!(2, metadata.columns_row_key.num_row_key_columns()); + assert_eq!(2, metadata.columns.num_row_key_columns()); let row_key_names: Vec<_> = metadata - .columns_row_key + .columns .iter_row_key_columns() .map(|column| &column.desc.name) .collect(); assert_eq!(["k1", "ts"], &row_key_names[..]); // 1 value column - assert_eq!(1, metadata.columns_row_key.num_value_columns()); + assert_eq!(1, metadata.columns.num_value_columns()); let value_names: Vec<_> = metadata - .columns_row_key + .columns .iter_value_columns() .map(|column| &column.desc.name) .collect(); assert_eq!(["v1"], &value_names[..]); // Check timestamp index. - assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index); + assert_eq!(1, metadata.columns.timestamp_key_index); // Check version column. - assert!(!metadata.columns_row_key.row_key.enable_version_column); + assert!(!metadata.columns.enable_version_column); assert!(metadata .column_families @@ -502,14 +740,14 @@ mod tests { Some(1), ); - assert_eq!(expect_schema, metadata.schema); + assert_eq!(expect_schema, *metadata.user_schema()); - // 4 columns - assert_eq!(4, metadata.columns_row_key.columns.columns.len()); + // 4 user columns and 2 internal columns. + assert_eq!(6, metadata.columns.columns.len()); // 3 row key columns - assert_eq!(3, metadata.columns_row_key.num_row_key_columns()); + assert_eq!(3, metadata.columns.num_row_key_columns()); let row_key_names: Vec<_> = metadata - .columns_row_key + .columns .iter_row_key_columns() .map(|column| &column.desc.name) .collect(); @@ -518,16 +756,25 @@ mod tests { &row_key_names[..] ); // 1 value column - assert_eq!(1, metadata.columns_row_key.num_value_columns()); + assert_eq!(1, metadata.columns.num_value_columns()); let value_names: Vec<_> = metadata - .columns_row_key + .columns .iter_value_columns() .map(|column| &column.desc.name) .collect(); assert_eq!(["v1"], &value_names[..]); // Check timestamp index. - assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index); + assert_eq!(1, metadata.columns.timestamp_key_index); // Check version column. - assert!(metadata.columns_row_key.row_key.enable_version_column); + assert!(metadata.columns.enable_version_column); + } + + #[test] + fn test_convert_between_raw() { + let metadata = new_metadata(true); + let raw = RawRegionMetadata::from(&metadata); + + let converted = RegionMetadata::try_from(raw).unwrap(); + assert_eq!(metadata, converted); } } diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index 2dcc5f2231..fe29451e87 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -36,11 +36,10 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Invalid timestamp index: {}", index))] - InvalidTimestampIndex { - index: usize, + #[snafu(display("Failed to convert schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] source: datatypes::error::Error, - backtrace: Backtrace, }, } @@ -81,10 +80,10 @@ impl TryFrom for schema::SchemaRef { let schema: schema::SchemaRef = match schema.timestamp_index { Some(index) => Arc::new( - schema::Schema::with_timestamp_index(column_schemas, index.value as usize) - .context(InvalidTimestampIndexSnafu { - index: index.value as usize, - })?, + schema::SchemaBuilder::from(column_schemas) + .timestamp_index(index.value as usize) + .build() + .context(ConvertSchemaSnafu)?, ), None => Arc::new(schema::Schema::new(column_schemas)), }; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 84788f7ad4..fce78bbf79 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; use datatypes::schema::SchemaRef; -use snafu::ensure; +use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::manifest::{ self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator, @@ -103,7 +103,7 @@ impl RegionImpl { .update(RegionMetaActionList::new(vec![ RegionMetaAction::Protocol(ProtocolAction::new()), RegionMetaAction::Change(RegionChange { - metadata: metadata.clone(), + metadata: (&*metadata).into(), }), ])) .await?; @@ -206,8 +206,13 @@ impl RegionImpl { for action in action_list.actions { match (action, version) { (RegionMetaAction::Change(c), None) => { + let region = c.metadata.name.clone(); + let region_metadata = c + .metadata + .try_into() + .context(error::InvalidRawRegionSnafu { region })?; version = Some(Version::with_manifest_version( - c.metadata, + Arc::new(region_metadata), last_manifest_version, )); for (manifest_version, action) in actions.drain(..) { diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 7b7f7ce969..93fc267260 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -197,7 +197,7 @@ async fn test_recover_region_manifets() { manifest .update(RegionMetaActionList::with_action(RegionMetaAction::Change( RegionChange { - metadata: region_meta.clone(), + metadata: (&*region_meta).into(), }, ))) .await diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 74bb9632a7..aac1d1031f 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -309,7 +309,7 @@ impl WriterInner { && memtables_to_add.get_by_range(range).is_none() { // Memtable for this range is missing, need to create a new memtable. - let memtable_schema = current_version.memtable_schema(); + let memtable_schema = current_version.schema().clone(); let id = self.alloc_memtable_id(); let memtable = self.memtable_builder.build(id, memtable_schema); memtables_to_add.insert(*range, memtable); diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs new file mode 100644 index 0000000000..06ad6ad8ac --- /dev/null +++ b/src/storage/src/schema.rs @@ -0,0 +1,450 @@ +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::arrow::array::Array; +use datatypes::arrow::chunk::Chunk; +use datatypes::arrow::datatypes::Schema as ArrowSchema; +use datatypes::prelude::Vector; +use datatypes::schema::Metadata; +use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector}; +use serde::{Deserialize, Serialize}; +use snafu::ensure; +use store_api::storage::{consts, ColumnSchema, Schema, SchemaBuilder, SchemaRef}; + +use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; +use crate::read::Batch; + +const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end"; +const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end"; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to build schema, source: {}", source))] + BuildSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to convert from arrow schema, source: {}", source))] + ConvertArrowSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Invalid internal column index in arrow schema"))] + InvalidIndex { backtrace: Backtrace }, + + #[snafu(display("Missing metadata {} in arrow schema", key))] + MissingMeta { key: String, backtrace: Backtrace }, + + #[snafu(display("Missing column {} in arrow schema", column))] + MissingColumn { + column: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to parse index in schema meta, value: {}, source: {}", + value, + source + ))] + ParseIndex { + value: String, + source: std::num::ParseIntError, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to convert arrow chunk to batch, name: {}, source: {}", + name, + source + ))] + ConvertChunk { + name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, +} + +pub type Result = std::result::Result; + +/// Schema of region. +/// +/// The `RegionSchema` has the knowledge of reserved and internal columns. +/// Reserved columns are columns that their names, ids are reserved by the storage +/// engine, and could not be used by the user. Reserved columns usually have +/// special usage. Reserved columns expect the version columns are also +/// called internal columns (though the version could also be thought as a +/// special kind of internal column), are not visible to user, such as our +/// internal sequence, value_type columns. +/// +/// The user schema is the schema that only contains columns that user could visit, +/// as well as what the schema user created. +#[derive(Debug, PartialEq)] +pub struct RegionSchema { + /// Schema that only contains columns that user defined, excluding internal columns + /// that are reserved and used by the storage engine. + /// + /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` + /// conveniently. The fields order in `SchemaRef` **must** be consistent with + /// columns order in [ColumnsMetadata] to ensure the projection index of a field + /// is correct. + user_schema: SchemaRef, + /// SST schema contains all columns of the region, including all internal columns. + sst_schema: SstSchema, + /// Metadata of columns. + columns: ColumnsMetadataRef, +} + +impl RegionSchema { + pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { + let user_schema = Arc::new(build_user_schema(&columns, version)?); + let sst_schema = SstSchema::new(&columns, version)?; + + debug_assert_eq!(user_schema.version(), sst_schema.version()); + debug_assert_eq!(version, user_schema.version()); + + Ok(RegionSchema { + user_schema, + sst_schema, + columns, + }) + } + + /// Returns the schema of the region, excluding internal columns that used by + /// the storage engine. + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + &self.user_schema + } + + /// Returns the schema for sst, which contains all columns used by the region. + #[inline] + pub fn sst_schema(&self) -> &SstSchema { + &self.sst_schema + } + + #[inline] + pub fn row_key_columns(&self) -> impl Iterator { + self.columns.iter_row_key_columns() + } + + #[inline] + pub fn value_columns(&self) -> impl Iterator { + self.columns.iter_value_columns() + } + + #[inline] + pub fn num_row_key_columns(&self) -> usize { + self.columns.num_row_key_columns() + } + + #[inline] + pub fn num_value_columns(&self) -> usize { + self.columns.num_value_columns() + } + + #[inline] + pub fn version(&self) -> u32 { + self.user_schema.version() + } +} + +pub type RegionSchemaRef = Arc; + +/// Schema of SST. +/// +/// Only contains a reference to schema and some indices, so it should be cheap to clone. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SstSchema { + schema: SchemaRef, + row_key_end: usize, + user_column_end: usize, +} + +impl SstSchema { + fn new(columns: &ColumnsMetadata, version: u32) -> Result { + let column_schemas: Vec<_> = columns + .iter_all_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + let schema = SchemaBuilder::from(column_schemas) + .timestamp_index(columns.timestamp_key_index()) + .version(version) + .add_metadata(ROW_KEY_END_KEY, columns.row_key_end().to_string()) + .add_metadata(USER_COLUMN_END_KEY, columns.user_column_end().to_string()) + .build() + .context(BuildSchemaSnafu)?; + + let user_column_end = columns.user_column_end(); + assert_eq!( + consts::SEQUENCE_COLUMN_NAME, + schema.column_schemas()[user_column_end].name + ); + assert_eq!( + consts::VALUE_TYPE_COLUMN_NAME, + schema.column_schemas()[user_column_end + 1].name + ); + + Ok(SstSchema { + schema: Arc::new(schema), + row_key_end: columns.row_key_end(), + user_column_end, + }) + } + + #[inline] + pub fn version(&self) -> u32 { + self.schema.version() + } + + #[inline] + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + #[inline] + pub fn arrow_schema(&self) -> &Arc { + self.schema.arrow_schema() + } + + pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> Chunk> { + assert_eq!( + self.schema.num_columns(), + // key columns + value columns + sequence + value_type + batch.keys.len() + batch.values.len() + 2 + ); + + Chunk::new( + batch + .keys + .iter() + .map(|v| v.to_arrow_array()) + .chain(batch.values.iter().map(|v| v.to_arrow_array())) + .chain(std::iter::once(batch.sequences.to_arrow_array())) + .chain(std::iter::once(batch.value_types.to_arrow_array())) + .collect(), + ) + } + + pub fn arrow_chunk_to_batch(&self, chunk: &Chunk>) -> Result { + let keys = self + .row_key_indices() + .map(|i| { + Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu { + name: self.column_name(i), + }) + }) + .collect::>()?; + let sequences = UInt64Vector::try_from_arrow_array(&chunk[self.sequence_index()].clone()) + .context(ConvertChunkSnafu { + name: consts::SEQUENCE_COLUMN_NAME, + })?; + let value_types = UInt8Vector::try_from_arrow_array( + &chunk[self.value_type_index()].clone(), + ) + .context(ConvertChunkSnafu { + name: consts::VALUE_TYPE_COLUMN_NAME, + })?; + let values = self + .value_indices() + .map(|i| { + Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu { + name: self.column_name(i), + }) + }) + .collect::>()?; + + Ok(Batch { + keys, + sequences, + value_types, + values, + }) + } + + #[inline] + fn sequence_index(&self) -> usize { + self.user_column_end + } + + #[inline] + fn value_type_index(&self) -> usize { + self.user_column_end + 1 + } + + #[inline] + fn row_key_indices(&self) -> impl Iterator { + 0..self.row_key_end + } + + #[inline] + fn value_indices(&self) -> impl Iterator { + self.row_key_end..self.user_column_end + } + + #[inline] + fn column_name(&self, idx: usize) -> &str { + &self.schema.column_schemas()[idx].name + } +} + +impl TryFrom for SstSchema { + type Error = Error; + + fn try_from(arrow_schema: ArrowSchema) -> Result { + let schema = Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?; + // Recover other metadata from schema. + let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?; + let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?; + + // There should be sequence and value type columns. + ensure!( + consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name, + InvalidIndexSnafu + ); + ensure!( + consts::VALUE_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name, + InvalidIndexSnafu + ); + + Ok(SstSchema { + schema: Arc::new(schema), + row_key_end, + user_column_end, + }) + } +} + +fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { + let value = metadata.get(key).context(MissingMetaSnafu { key })?; + value.parse().context(ParseIndexSnafu { value }) +} + +// Now user schema don't have extra metadata like sst schema. +fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result { + let column_schemas: Vec<_> = columns + .iter_user_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + SchemaBuilder::from(column_schemas) + .timestamp_index(columns.timestamp_key_index()) + .version(version) + .build() + .context(BuildSchemaSnafu) +} + +#[cfg(test)] +mod tests { + use datatypes::type_id::LogicalTypeId; + use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector}; + + use super::*; + use crate::metadata::RegionMetadata; + use crate::test_util::{descriptor_util::RegionDescBuilder, schema_util}; + + fn new_batch() -> Batch { + let k1 = Int64Vector::from_slice(&[1, 2, 3]); + let timestamp = Int64Vector::from_slice(&[4, 5, 6]); + let v1 = Int64Vector::from_slice(&[7, 8, 9]); + + Batch { + keys: vec![Arc::new(k1), Arc::new(timestamp)], + values: vec![Arc::new(v1)], + sequences: UInt64Vector::from_slice(&[100, 100, 100]), + value_types: UInt8Vector::from_slice(&[0, 0, 0]), + } + } + + fn check_chunk_batch(chunk: &Chunk>, batch: &Batch) { + assert_eq!(5, chunk.columns().len()); + assert_eq!(3, chunk.len()); + + for i in 0..2 { + assert_eq!(chunk[i], batch.keys[i].to_arrow_array()); + } + assert_eq!(chunk[2], batch.values[0].to_arrow_array()); + assert_eq!(chunk[3], batch.sequences.to_arrow_array()); + assert_eq!(chunk[4], batch.value_types.to_arrow_array()); + } + + #[test] + fn test_region_schema() { + let desc = RegionDescBuilder::new("test") + .push_key_column(("k1", LogicalTypeId::Int64, false)) + .push_value_column(("v1", LogicalTypeId::Int64, true)) + .build(); + let metadata: RegionMetadata = desc.try_into().unwrap(); + + let columns = metadata.columns; + let region_schema = RegionSchema::new(columns.clone(), 0).unwrap(); + + let expect_schema = schema_util::new_schema( + &[ + ("k1", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Int64, true), + ], + Some(1), + ); + + assert_eq!(expect_schema, **region_schema.user_schema()); + + let mut row_keys = region_schema.row_key_columns(); + assert_eq!("k1", row_keys.next().unwrap().desc.name); + assert_eq!("timestamp", row_keys.next().unwrap().desc.name); + assert_eq!(None, row_keys.next()); + assert_eq!(2, region_schema.num_row_key_columns()); + + let mut values = region_schema.value_columns(); + assert_eq!("v1", values.next().unwrap().desc.name); + assert_eq!(None, values.next()); + assert_eq!(1, region_schema.num_value_columns()); + + assert_eq!(0, region_schema.version()); + { + let region_schema = RegionSchema::new(columns, 1234).unwrap(); + assert_eq!(1234, region_schema.version()); + assert_eq!(1234, region_schema.sst_schema().version()); + } + + let sst_schema = region_schema.sst_schema(); + let sst_arrow_schema = sst_schema.arrow_schema(); + let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap(); + + assert_eq!(*sst_schema, converted_sst_schema); + + let expect_schema = schema_util::new_schema( + &[ + ("k1", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Int64, true), + (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), + (consts::VALUE_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), + ], + Some(1), + ); + assert_eq!( + expect_schema.column_schemas(), + sst_schema.schema().column_schemas() + ); + assert_eq!(3, sst_schema.sequence_index()); + assert_eq!(4, sst_schema.value_type_index()); + let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect(); + assert_eq!([0, 1], &row_key_indices[..]); + let value_indices: Vec<_> = sst_schema.value_indices().collect(); + assert_eq!([2], &value_indices[..]); + + // Test batch and chunk conversion. + let batch = new_batch(); + // Convert batch to chunk. + let chunk = sst_schema.batch_to_arrow_chunk(&batch); + check_chunk_batch(&chunk, &batch); + + // Convert chunk to batch. + let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap(); + check_chunk_batch(&chunk, &converted_batch); + } +} diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index ca861a9f96..50adc5e338 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -26,7 +26,7 @@ impl Snapshot for SnapshotImpl { type Reader = ChunkReaderImpl; fn schema(&self) -> &SchemaRef { - self.version.schema() + self.version.user_schema() } async fn scan( @@ -41,7 +41,7 @@ impl Snapshot for SnapshotImpl { let immutables = memtable_version.immutable_memtables(); let mut builder = - ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) + ChunkReaderBuilder::new(self.version.user_schema().clone(), self.sst_layer.clone()) .reserve_num_memtables(memtable_version.num_memtables()) .iter_ctx(IterContext { batch_size: ctx.batch_size, diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 8db579f36d..e0799d0463 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -15,22 +15,15 @@ use datatypes::arrow::io::parquet::read::{ use datatypes::arrow::io::parquet::write::{ Compression, Encoding, FileSink, Version, WriteOptions, }; -use datatypes::prelude::{ConcreteDataType, Vector}; -use datatypes::schema::ColumnSchema; -use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector}; use futures_util::sink::SinkExt; use futures_util::{Stream, TryStreamExt}; use object_store::{ObjectStore, SeekableReader}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::consts; +use snafu::ResultExt; -use crate::error::{ - FlushIoSnafu, InvalidParquetSchemaSnafu, ReadParquetIoSnafu, ReadParquetSnafu, Result, - SequenceColumnNotFoundSnafu, WriteParquetSnafu, -}; -use crate::memtable::{BoxedBatchIterator, MemtableSchema}; -use crate::metadata::ColumnMetadata; +use crate::error::{self, Result}; +use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; +use crate::schema::SstSchema; use crate::sst; /// Parquet sst writer. @@ -61,20 +54,23 @@ impl<'a> ParquetWriter<'a> { /// A chunk of records yielded from each iteration with a size given /// in config will be written to a single row group. async fn write_rows(self, extra_meta: Option>) -> Result<()> { - let schema = memtable_schema_to_arrow_schema(self.iter.schema()); + let region_schema = self.iter.schema(); + let sst_schema = region_schema.sst_schema(); + let schema = sst_schema.arrow_schema(); let object = self.object_store.object(self.file_path); // FIXME(hl): writer size is not used in fs backend so just leave it to 0, // but in s3/azblob backend the Content-Length field of HTTP request is set // to this value. - let writer = object.writer(0).await.context(FlushIoSnafu)?; + let writer = object.writer(0).await.context(error::FlushIoSnafu)?; // now all physical types use plain encoding, maybe let caller to choose encoding for each type. - let encodings = get_encoding_for_schema(&schema, |_| Encoding::Plain); + let encodings = get_encoding_for_schema(&*schema, |_| Encoding::Plain); let mut sink = FileSink::try_new( writer, - schema, + // The file sink needs the `Schema` instead of a reference. + (**schema).clone(), encodings, WriteOptions { write_statistics: true, @@ -82,22 +78,13 @@ impl<'a> ParquetWriter<'a> { version: Version::V2, }, ) - .context(WriteParquetSnafu)?; + .context(error::WriteParquetSnafu)?; for batch in self.iter { let batch = batch?; - sink.send(Chunk::new( - batch - .keys - .iter() - .map(|v| v.to_arrow_array()) - .chain(std::iter::once(batch.sequences.to_arrow_array())) - .chain(std::iter::once(batch.value_types.to_arrow_array())) - .chain(batch.values.iter().map(|v| v.to_arrow_array())) - .collect(), - )) - .await - .context(WriteParquetSnafu)?; + sink.send(sst_schema.batch_to_arrow_chunk(&batch)) + .await + .context(error::WriteParquetSnafu)?; } if let Some(meta) = extra_meta { @@ -105,42 +92,13 @@ impl<'a> ParquetWriter<'a> { sink.metadata.insert(k, Some(v)); } } - sink.close().await.context(WriteParquetSnafu)?; + sink.close().await.context(error::WriteParquetSnafu)?; // FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162), // upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version. - sink.flush().await.context(WriteParquetSnafu) + sink.flush().await.context(error::WriteParquetSnafu) } } -/// Assembles arrow schema from memtable schema info. -/// TODO(hl): implement `From` for `MemtableSchema`/`Physical schema` -fn memtable_schema_to_arrow_schema(schema: &MemtableSchema) -> Schema { - let col_meta_to_field: fn(&ColumnMetadata) -> Field = |col_meta| { - Field::from(&ColumnSchema::new( - col_meta.desc.name.clone(), - col_meta.desc.data_type.clone(), - col_meta.desc.is_nullable, - )) - }; - - let fields = schema - .row_key_columns() - .map(col_meta_to_field) - .chain(std::iter::once(Field::from(&ColumnSchema::new( - consts::SEQUENCE_COLUMN_NAME, - ConcreteDataType::uint64_datatype(), - false, - )))) - .chain(std::iter::once(Field::from(&ColumnSchema::new( - consts::VALUE_TYPE_COLUMN_NAME, - ConcreteDataType::uint8_datatype(), - false, - )))) - .chain(schema.value_columns().map(col_meta_to_field)) - .collect::>(); - Schema::from(fields) -} - fn get_encoding_for_schema Encoding + Clone>( schema: &Schema, map: F, @@ -213,9 +171,11 @@ impl<'a> ParquetReader<'a> { } } + // TODO(yingwen): Projection is not supported now, since field index would change after projection. + // To support projection, we may need to implement some helper methods in schema. pub async fn chunk_stream( &self, - projection: Option, + _projection: Option, chunk_size: usize, ) -> Result { let file_path = self.file_path.to_string(); @@ -229,17 +189,17 @@ impl<'a> ParquetReader<'a> { let file_path = self.file_path.to_string(); let mut reader = reader_factory() .await - .context(ReadParquetIoSnafu { file: &file_path })?; + .context(error::ReadParquetIoSnafu { file: &file_path })?; let metadata = read_metadata_async(&mut reader) .await - .context(ReadParquetSnafu { file: &file_path })?; - let schema = infer_schema(&metadata).context(ReadParquetSnafu { file: &file_path })?; + .context(error::ReadParquetSnafu { file: &file_path })?; + let arrow_schema = + infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?; + // Just read all fields. + let projected_fields = arrow_schema.fields.clone(); - let projected_fields = projection.map_or_else(|| schema.fields.clone(), |p| p(&schema)); - let projected_schema = Schema { - fields: projected_fields.clone(), - metadata: schema.metadata, - }; + let sst_schema = SstSchema::try_from(arrow_schema) + .context(error::ConvertSstSchemaSnafu { file: &file_path })?; let chunk_stream = try_stream!({ for rg in metadata.row_groups { @@ -250,36 +210,31 @@ impl<'a> ParquetReader<'a> { Some(chunk_size), ) .await - .context(ReadParquetSnafu { file: &file_path })?; + .context(error::ReadParquetSnafu { file: &file_path })?; let chunks = RowGroupDeserializer::new(column_chunks, rg.num_rows() as usize, None); for maybe_chunk in chunks { let columns_in_chunk = - maybe_chunk.context(ReadParquetSnafu { file: &file_path })?; + maybe_chunk.context(error::ReadParquetSnafu { file: &file_path })?; yield columns_in_chunk; } } }); - ChunkStream::new(projected_schema, Box::pin(chunk_stream)) + ChunkStream::new(sst_schema, Box::pin(chunk_stream)) } } -type ChunkConverter = Box>) -> Result + Send + Sync>; - pub type SendableChunkStream = Pin>>> + Send>>; pub struct ChunkStream { + schema: SstSchema, stream: SendableChunkStream, - converter: ChunkConverter, } impl ChunkStream { - pub fn new(schema: Schema, stream: SendableChunkStream) -> Result { - Ok(Self { - converter: batch_converter_factory(schema)?, - stream, - }) + pub fn new(schema: SstSchema, stream: SendableChunkStream) -> Result { + Ok(Self { schema, stream }) } } @@ -289,63 +244,15 @@ impl BatchReader for ChunkStream { self.stream .try_next() .await? - .map(&self.converter) + .map(|chunk| { + self.schema + .arrow_chunk_to_batch(&chunk) + .context(error::InvalidParquetSchemaSnafu) + }) .transpose() } } -// TODO(hl): Currently rowkey/values/reserved columns are identified by their position in field: -// all fields before `__sequence` column are rowkeys and fields after `__value_type` are values. -// But it would be better to persist rowkey/value columns' positions to Parquet metadata and -// parse `MemtableSchema` from metadata while building BatchReader. -fn batch_converter_factory(schema: Schema) -> Result { - let ts_idx = schema - .fields - .iter() - .position(|f| f.name == consts::SEQUENCE_COLUMN_NAME) - .with_context(|| SequenceColumnNotFoundSnafu { - msg: format!("Schema: {:?}", schema), - })?; - - let field_len = schema.fields.len(); - - macro_rules! handle_err { - ($stmt: expr, $schema: ident) => { - $stmt.with_context(|_| InvalidParquetSchemaSnafu { - msg: format!("Schema type error: {:?}", $schema), - })? - }; - } - - let converter = move |c: Chunk>| { - Ok(Batch { - sequences: handle_err!( - UInt64Vector::try_from_arrow_array(&c.arrays()[ts_idx].clone()), - schema - ), - value_types: handle_err!( - UInt8Vector::try_from_arrow_array(&c.arrays()[ts_idx + 1].clone()), - schema - ), - keys: handle_err!( - (0..ts_idx) - .into_iter() - .map(|i| Helper::try_into_vector(&c.arrays()[i].clone())) - .collect::>(), - schema - ), - values: handle_err!( - (ts_idx + 2..field_len) - .into_iter() - .map(|i| Helper::try_into_vector(&c.arrays()[i].clone())) - .collect::>(), - schema - ), - }) - }; - Ok(Box::new(converter)) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -398,10 +305,11 @@ mod tests { let reader = std::fs::File::open(dir.path().join(sst_file_name)).unwrap(); let mut file_reader = FileReader::try_new(reader, None, Some(128), None, None).unwrap(); - // chunk schema: timestamp, __version, __sequence, __value_type, v1 + // chunk schema: timestamp, __version, v1, __sequence, __value_type let chunk = file_reader.next().unwrap().unwrap(); assert_eq!(5, chunk.arrays().len()); + // timestamp assert_eq!( Arc::new(Int64Array::from_slice(&[ 1000, 1000, 1001, 2002, 2003, 2003 @@ -409,23 +317,27 @@ mod tests { chunk.arrays()[0] ); + // version assert_eq!( Arc::new(UInt64Array::from_slice(&[1, 2, 1, 1, 1, 5])) as Arc, chunk.arrays()[1] ); + // v1 assert_eq!( - Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc, + Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc, chunk.arrays()[2] ); + // sequence assert_eq!( - Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc, + Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc, chunk.arrays()[3] ); + // value type assert_eq!( - Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc, + Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc, chunk.arrays()[4] ); } diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index d99dbb90b7..ac0602359d 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use datatypes::prelude::*; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; /// Column definition: (name, datatype, is_nullable) pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); @@ -16,7 +16,10 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> .collect(); if let Some(index) = timestamp_index { - Schema::with_timestamp_index(column_schemas, index).unwrap() + SchemaBuilder::from(column_schemas) + .timestamp_index(index) + .build() + .unwrap() } else { Schema::new(column_schemas) } diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index a5fd13f8b0..93f81cd09f 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -14,8 +14,9 @@ use std::time::Duration; use store_api::manifest::ManifestVersion; use store_api::storage::{SchemaRef, SequenceNumber}; -use crate::memtable::{MemtableId, MemtableSchema, MemtableSet, MemtableVersion}; +use crate::memtable::{MemtableId, MemtableSet, MemtableVersion}; use crate::metadata::RegionMetadataRef; +use crate::schema::RegionSchemaRef; use crate::sst::LevelMetas; use crate::sst::{FileHandle, FileMeta}; use crate::sync::CowCell; @@ -170,8 +171,13 @@ impl Version { } #[inline] - pub fn schema(&self) -> &SchemaRef { - &self.metadata.schema + pub fn schema(&self) -> &RegionSchemaRef { + self.metadata.schema() + } + + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + self.metadata.user_schema() } #[inline] @@ -199,11 +205,6 @@ impl Version { DEFAULT_BUCKET_DURATION } - #[inline] - pub fn memtable_schema(&self) -> MemtableSchema { - MemtableSchema::new(self.metadata.columns_row_key.clone()) - } - pub fn apply_edit(&mut self, edit: VersionEdit) { let flushed_sequence = edit.flushed_sequence.unwrap_or(self.flushed_sequence); if self.flushed_sequence < flushed_sequence { diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 8616c981b9..b2cb9a78f1 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -12,7 +12,7 @@ mod snapshot; mod types; pub use datatypes::data_type::ConcreteDataType; -pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +pub use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 536e0608cd..93572d609f 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -2,11 +2,11 @@ use crate::storage::descriptors::{ColumnFamilyId, ColumnId}; -// ---------- Ids reserved for internal column families ------------------------ +// ---------- Reserved column family ids --------------------------------------- /// Column family Id for row key columns. /// -/// This is virtual column family, actually row key columns are not +/// This is a virtual column family, actually row key columns are not /// stored in any column family. pub const KEY_CF_ID: ColumnFamilyId = 0; /// Id for default column family. @@ -14,11 +14,45 @@ pub const DEFAULT_CF_ID: ColumnFamilyId = 1; // ----------------------------------------------------------------------------- -// ---------- Ids reserved for internal columns -------------------------------- +// ---------- Reserved column ids ---------------------------------------------- -// TODO(yingwen): Reserve one bit for internal columns. -/// Column id for version column. -pub const VERSION_COLUMN_ID: ColumnId = 1; +// The reserved column id is too large to be defined as enum value (denied by the +// `clippy::enum_clike_unportable_variant` lint). So we add this enum as offset +// in ReservedColumnId to get the final column id. +enum ReservedColumnType { + Version = 0, + Sequence, + ValueType, +} + +/// Column id reserved by the engine. +/// +/// All reserved column id has MSB (Most Significant Bit) set to 1. +/// +/// Reserved column includes version column and other internal columns. +pub struct ReservedColumnId; + +impl ReservedColumnId { + // Set MSB to 1. + const BASE: ColumnId = 1 << (ColumnId::BITS - 1); + + /// Column id for version column. + /// Version column is a special reserved column that is enabled by user and + /// visible to user. + pub const fn version() -> ColumnId { + Self::BASE | ReservedColumnType::Version as ColumnId + } + + /// Id for `__sequence` column. + pub const fn sequence() -> ColumnId { + Self::BASE | ReservedColumnType::Sequence as ColumnId + } + + /// Id for `__value_type` column. + pub const fn value_type() -> ColumnId { + Self::BASE | ReservedColumnType::ValueType as ColumnId + } +} // ----------------------------------------------------------------------------- @@ -36,6 +70,7 @@ pub const SEQUENCE_COLUMN_NAME: &str = "__sequence"; /// Name for time index constraint name. pub const TIME_INDEX_NAME: &str = "__time_index"; +// TODO(yingwen): `__op_type` might be proper than `__value_type`. /// Name for reserved column: value_type pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type"; @@ -46,3 +81,15 @@ pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type"; pub const READ_BATCH_SIZE: usize = 256; // ----------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reserved_id() { + assert_eq!(0x80000000, ReservedColumnId::version()); + assert_eq!(0x80000001, ReservedColumnId::sequence()); + assert_eq!(0x80000002, ReservedColumnId::value_type()); + } +} diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index 293db4b89f..fa68020e5c 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -3,8 +3,7 @@ mod mock_engine; use std::sync::Arc; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::SchemaRef; -use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; use log_store::fs::noop::NoopLogStore; use object_store::{backend::fs::Backend, ObjectStore}; use storage::config::EngineConfig as StorageEngineConfig; @@ -32,7 +31,10 @@ pub fn schema_for_test() -> Schema { ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; - Schema::with_timestamp_index(column_schemas, 0).expect("ts must be timestamp column") + SchemaBuilder::from(column_schemas) + .timestamp_index(0) + .build() + .expect("ts must be timestamp column") } pub type MockMitoEngine = MitoEngine; diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index be8fd4348b..a18b1390ab 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -43,7 +43,7 @@ impl Snapshot for MockSnapshot { type Reader = MockChunkReader; fn schema(&self) -> &SchemaRef { - &self.metadata.schema + self.metadata.user_schema() } async fn scan( @@ -52,7 +52,7 @@ impl Snapshot for MockSnapshot { _request: ScanRequest, ) -> Result> { let reader = MockChunkReader { - schema: self.metadata.schema.clone(), + schema: self.metadata.user_schema().clone(), }; Ok(ScanResponse { reader })