diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index c0c359550a..5fd802d1ed 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -171,7 +171,7 @@ fn build_system_catalog_schema() -> Schema { // The schema of this table must be valid. SchemaBuilder::try_from(cols) .unwrap() - .timestamp_index(2) + .timestamp_index(Some(2)) .build() .unwrap() } diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index 02dc88e85d..a95ad70d79 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize, Serializer}; use snafu::{ensure, OptionExt, ResultExt}; -use table::metadata::{TableId, TableMeta, TableVersion}; +use table::metadata::{RawTableMeta, TableId, TableVersion}; use crate::consts::{CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_KEY_PREFIX}; use crate::error::{ @@ -101,7 +101,7 @@ impl TableKey { pub struct TableValue { pub id: TableId, pub node_id: String, - pub meta: TableMeta, + pub meta: RawTableMeta, } impl TableValue { @@ -204,10 +204,8 @@ impl SchemaValue { #[cfg(test)] mod tests { - use std::sync::Arc; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::schema::{ColumnSchema, RawSchema, Schema}; use super::*; @@ -254,14 +252,14 @@ mod tests { #[test] fn test_serialize_schema() { - let schema_ref = Arc::new(Schema::new(vec![ColumnSchema::new( + let schema = Schema::new(vec![ColumnSchema::new( "name", ConcreteDataType::string_datatype(), true, - )])); + )]); - let meta = TableMeta { - schema: schema_ref, + let meta = RawTableMeta { + schema: RawSchema::from(&schema), engine: "mito".to_string(), created_on: chrono::DateTime::default(), primary_key_indices: vec![0, 1], diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 579c0d96a6..4d52b39767 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -145,7 +145,7 @@ fn create_table_schema(expr: &CreateExpr) -> Result { Ok(Arc::new( SchemaBuilder::try_from(column_schemas) .context(error::CreateSchemaSnafu)? - .timestamp_index(ts_index) + .timestamp_index(Some(ts_index)) .build() .context(error::CreateSchemaSnafu)?, )) @@ -314,7 +314,7 @@ mod tests { Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(1) + .timestamp_index(Some(1)) .build() .unwrap(), ) diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 93d229478b..6007d6db50 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -153,7 +153,7 @@ pub fn build_create_table_request( let schema = Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(timestamp_index) + .timestamp_index(Some(timestamp_index)) .build() .context(error::CreateSchemaSnafu)?, ); @@ -430,7 +430,7 @@ mod tests { let schema = Arc::new( SchemaBuilder::try_from(columns) .unwrap() - .timestamp_index(1) + .timestamp_index(Some(1)) .build() .unwrap(), ); @@ -537,7 +537,7 @@ mod tests { Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(3) + .timestamp_index(Some(3)) .build() .unwrap(), ) diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 2de3806811..2df2262b73 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -133,7 +133,7 @@ mod tests { Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(3) + .timestamp_index(Some(3)) .build() .unwrap(), ) diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index 6887da8a8c..e2e880fa6e 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -143,7 +143,7 @@ impl SqlHandler { let schema = Arc::new( SchemaBuilder::try_from(columns_schemas) .context(CreateSchemaSnafu)? - .timestamp_index(ts_index) + .timestamp_index(Some(ts_index)) .build() .context(CreateSchemaSnafu)?, ); diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 7e284c8f66..ec8f9f6614 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -65,7 +65,7 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) - schema: Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(3) + .timestamp_index(Some(3)) .build() .expect("ts is expected to be timestamp column"), ), diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index c0d5cd4c60..82fd3c874b 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,4 +1,5 @@ mod constraint; +mod raw; use std::collections::HashMap; use std::sync::Arc; @@ -11,6 +12,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu}; pub use crate::schema::constraint::ColumnDefaultConstraint; +pub use crate::schema::raw::RawSchema; use crate::vectors::VectorRef; /// Key used to store column name of the timestamp column in metadata. @@ -104,7 +106,7 @@ impl ColumnSchema { } /// A common schema, should be immutable. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq)] pub struct Schema { column_schemas: Vec, name_to_index: HashMap, @@ -125,19 +127,20 @@ impl Schema { pub const INITIAL_VERSION: u32 = 0; /// Create a schema from a vector of [ColumnSchema]. + /// /// # Panics /// Panics when ColumnSchema's `default_constrait` can't be serialized into json. pub fn new(column_schemas: Vec) -> Schema { - // Builder won't fail + // Builder won't fail in this case SchemaBuilder::try_from(column_schemas) .unwrap() .build() .unwrap() } + /// Try to Create a schema from a vector of [ColumnSchema]. pub fn try_new(column_schemas: Vec) -> Result { - // Builder won't fail - Ok(SchemaBuilder::try_from(column_schemas)?.build().unwrap()) + SchemaBuilder::try_from(column_schemas)?.build() } #[inline] @@ -239,8 +242,8 @@ impl SchemaBuilder { /// Set timestamp index. /// /// The validation of timestamp column is done in `build()`. - pub fn timestamp_index(mut self, timestamp_index: usize) -> Self { - self.timestamp_index = Some(timestamp_index); + pub fn timestamp_index(mut self, timestamp_index: Option) -> Self { + self.timestamp_index = timestamp_index; self } @@ -547,7 +550,10 @@ mod tests { assert_eq!(0, schema.num_columns()); assert!(schema.is_empty()); - assert!(SchemaBuilder::default().timestamp_index(0).build().is_err()); + assert!(SchemaBuilder::default() + .timestamp_index(Some(0)) + .build() + .is_err()); } #[test] @@ -600,7 +606,7 @@ mod tests { ]; let schema = SchemaBuilder::try_from(column_schemas.clone()) .unwrap() - .timestamp_index(1) + .timestamp_index(Some(1)) .version(123) .build() .unwrap(); @@ -622,17 +628,17 @@ mod tests { ]; assert!(SchemaBuilder::try_from(column_schemas.clone()) .unwrap() - .timestamp_index(0) + .timestamp_index(Some(0)) .build() .is_err()); assert!(SchemaBuilder::try_from(column_schemas.clone()) .unwrap() - .timestamp_index(1) + .timestamp_index(Some(1)) .build() .is_err()); assert!(SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(1) + .timestamp_index(Some(1)) .build() .is_err()); } diff --git a/src/datatypes/src/schema/raw.rs b/src/datatypes/src/schema/raw.rs new file mode 100644 index 0000000000..1845f8b3ee --- /dev/null +++ b/src/datatypes/src/schema/raw.rs @@ -0,0 +1,60 @@ +use serde::{Deserialize, Serialize}; + +use crate::error::{Error, Result}; +use crate::schema::{ColumnSchema, Schema, SchemaBuilder}; + +/// Struct used to serialize and deserialize [`Schema`](crate::schema::Schema). +/// +/// This struct only contains necessary data to recover the Schema. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RawSchema { + pub column_schemas: Vec, + pub timestamp_index: Option, + pub version: u32, +} + +impl TryFrom for Schema { + type Error = Error; + + fn try_from(raw: RawSchema) -> Result { + SchemaBuilder::try_from(raw.column_schemas)? + .timestamp_index(raw.timestamp_index) + .version(raw.version) + .build() + } +} + +impl From<&Schema> for RawSchema { + fn from(schema: &Schema) -> RawSchema { + RawSchema { + column_schemas: schema.column_schemas.clone(), + timestamp_index: schema.timestamp_index, + version: schema.version, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data_type::ConcreteDataType; + + #[test] + fn test_raw_convert() { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ]; + let schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(Some(1)) + .version(123) + .build() + .unwrap(); + + let raw = RawSchema::from(&schema); + let schema_new = Schema::try_from(raw).unwrap(); + + assert_eq!(schema, schema_new); + } +} diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 073601db73..fec37e6667 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -217,7 +217,7 @@ fn build_scripts_schema() -> Schema { // Schema is always valid here SchemaBuilder::try_from(cols) .unwrap() - .timestamp_index(3) + .timestamp_index(Some(3)) .build() .unwrap() } diff --git a/src/storage/benches/memtable/util/schema_util.rs b/src/storage/benches/memtable/util/schema_util.rs index 421c4675af..073675365e 100644 --- a/src/storage/benches/memtable/util/schema_util.rs +++ b/src/storage/benches/memtable/util/schema_util.rs @@ -7,7 +7,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> Schema { - let column_schemas = column_defs + let column_schemas: Vec<_> = column_defs .iter() .map(|column_def| { let datatype = column_def.1.data_type(); @@ -15,15 +15,11 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> }) .collect(); - if let Some(index) = timestamp_index { - SchemaBuilder::try_from(column_schemas) - .unwrap() - .timestamp_index(index) - .build() - .unwrap() - } else { - Schema::new(column_schemas) - } + SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(timestamp_index) + .build() + .unwrap() } pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option) -> SchemaRef { diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs index 7893aa9110..6aefe757a5 100644 --- a/src/storage/src/proto/write_batch.rs +++ b/src/storage/src/proto/write_batch.rs @@ -78,16 +78,14 @@ impl TryFrom for schema::SchemaRef { .map(schema::ColumnSchema::try_from) .collect::>>()?; - let schema: schema::SchemaRef = match schema.timestamp_index { - Some(index) => Arc::new( - schema::SchemaBuilder::try_from(column_schemas) - .context(ConvertSchemaSnafu)? - .timestamp_index(index.value as usize) - .build() - .context(ConvertSchemaSnafu)?, - ), - None => Arc::new(schema::Schema::try_new(column_schemas).context(ConvertSchemaSnafu)?), - }; + let timestamp_index = schema.timestamp_index.map(|index| index.value as usize); + let schema = Arc::new( + schema::SchemaBuilder::try_from(column_schemas) + .context(ConvertSchemaSnafu)? + .timestamp_index(timestamp_index) + .build() + .context(ConvertSchemaSnafu)?, + ); Ok(schema) } diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index 81d92070fd..075b949bf8 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -235,14 +235,12 @@ impl ProjectedSchema { }) .collect(); - let mut builder = SchemaBuilder::try_from(column_schemas) + let schema = SchemaBuilder::try_from(column_schemas) .context(metadata::ConvertSchemaSnafu)? - .version(region_schema.version()); - if let Some(timestamp_index) = timestamp_index { - builder = builder.timestamp_index(timestamp_index); - } - - let schema = builder.build().context(metadata::InvalidSchemaSnafu)?; + .timestamp_index(timestamp_index) + .version(region_schema.version()) + .build() + .context(metadata::InvalidSchemaSnafu)?; Ok(Arc::new(schema)) } diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index 29bb84cb84..e5cb36cb08 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -134,7 +134,7 @@ fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result SchemaBuilder::try_from(column_schemas) .context(metadata::ConvertSchemaSnafu)? - .timestamp_index(columns.timestamp_key_index()) + .timestamp_index(Some(columns.timestamp_key_index())) .version(version) .build() .context(metadata::InvalidSchemaSnafu) diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index c65ac6781a..2ac5856059 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -94,7 +94,7 @@ impl StoreSchema { let schema = SchemaBuilder::try_from(column_schemas) .context(metadata::ConvertSchemaSnafu)? - .timestamp_index(timestamp_key_index) + .timestamp_index(Some(timestamp_key_index)) .version(version) .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) .add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string()) @@ -252,7 +252,7 @@ mod tests { let expect_schema = SchemaBuilder::try_from(column_schemas) .unwrap() .version(123) - .timestamp_index(1) + .timestamp_index(Some(1)) .build() .unwrap(); // Only compare column schemas since SchemaRef in StoreSchema also contains other metadata that only used diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index d3161971e7..1a2698b24d 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -23,13 +23,12 @@ pub fn new_schema_with_version( }) .collect(); - let mut builder = SchemaBuilder::try_from(column_schemas) + SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(timestamp_index) + .version(version) + .build() .unwrap() - .version(version); - if let Some(index) = timestamp_index { - builder = builder.timestamp_index(index); - } - builder.build().unwrap() } pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option) -> SchemaRef { diff --git a/src/storage/src/write_batch/compat.rs b/src/storage/src/write_batch/compat.rs index a456448009..18268b370c 100644 --- a/src/storage/src/write_batch/compat.rs +++ b/src/storage/src/write_batch/compat.rs @@ -109,7 +109,7 @@ mod tests { SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(1) + .timestamp_index(Some(1)) } fn new_test_schema(v0_constraint: Option>) -> SchemaRef { diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index d429b3be43..beb3ea9578 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -464,7 +464,7 @@ mod tests { let schema = Arc::new( SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(2) + .timestamp_index(Some(2)) .build() .expect("ts must be timestamp column"), ); diff --git a/src/table-engine/src/error.rs b/src/table-engine/src/error.rs index 12194591df..219f35a278 100644 --- a/src/table-engine/src/error.rs +++ b/src/table-engine/src/error.rs @@ -178,6 +178,15 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display( + "Failed to convert metadata from deserialized data, source: {}", + source + ))] + ConvertRaw { + #[snafu(backtrace)] + source: table::metadata::ConvertError, + }, } impl From for table::error::Error { @@ -215,7 +224,7 @@ impl ErrorExt for Error { ColumnsNotExist { .. } => StatusCode::TableColumnNotFound, - TableInfoNotFound { .. } => StatusCode::Unexpected, + TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected, ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, } diff --git a/src/table-engine/src/manifest.rs b/src/table-engine/src/manifest.rs index c254a238be..522ed3206c 100644 --- a/src/table-engine/src/manifest.rs +++ b/src/table-engine/src/manifest.rs @@ -12,7 +12,7 @@ mod tests { use storage::manifest::MetaActionIteratorImpl; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{Manifest, MetaActionIterator}; - use table::metadata::TableInfo; + use table::metadata::{RawTableInfo, TableInfo}; use super::*; use crate::manifest::action::{TableChange, TableMetaAction, TableRemove}; @@ -32,7 +32,7 @@ mod tests { matches!(&action_list.actions[0], TableMetaAction::Protocol(p) if *p == *protocol) ); assert!( - matches!(&action_list.actions[1], TableMetaAction::Change(c) if c.table_info == *table_info) + matches!(&action_list.actions[1], TableMetaAction::Change(c) if TableInfo::try_from(c.table_info.clone()).unwrap() == *table_info) ); } _ => unreachable!(), @@ -52,7 +52,7 @@ mod tests { let table_info = test_util::build_test_table_info(); let action_list = TableMetaActionList::new(vec![TableMetaAction::Change(Box::new(TableChange { - table_info: table_info.clone(), + table_info: RawTableInfo::from(table_info.clone()), }))]); assert_eq!(0, manifest.update(action_list).await.unwrap()); diff --git a/src/table-engine/src/manifest/action.rs b/src/table-engine/src/manifest/action.rs index 87e074f497..ce35f6dea5 100644 --- a/src/table-engine/src/manifest/action.rs +++ b/src/table-engine/src/manifest/action.rs @@ -11,11 +11,11 @@ use storage::manifest::helper; use store_api::manifest::action::{ProtocolAction, ProtocolVersion, VersionHeader}; use store_api::manifest::ManifestVersion; use store_api::manifest::MetaAction; -use table::metadata::{TableIdent, TableInfo}; +use table::metadata::{RawTableInfo, TableIdent}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct TableChange { - pub table_info: TableInfo, + pub table_info: RawTableInfo, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -137,7 +137,7 @@ mod tests { let mut protocol = ProtocolAction::new(); protocol.min_reader_version = 1; - let table_info = test_util::build_test_table_info(); + let table_info = RawTableInfo::from(test_util::build_test_table_info()); let mut action_list = TableMetaActionList::new(vec![ TableMetaAction::Protocol(protocol.clone()), diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 04d1471bbe..2a6e239970 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -28,7 +28,7 @@ use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder}; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; use table::table::scan::SimpleTableScan; use table::{ - metadata::{TableInfo, TableType}, + metadata::{RawTableInfo, TableInfo, TableType}, table::Table, }; use tokio::sync::Mutex; @@ -288,7 +288,7 @@ impl Table for MitoTable { self.manifest .update(TableMetaActionList::with_action(TableMetaAction::Change( Box::new(TableChange { - table_info: new_info.clone(), + table_info: RawTableInfo::from(new_info.clone()), }), ))) .await @@ -339,11 +339,9 @@ fn build_table_schema_with_new_columns( .context(SchemaBuildSnafu { msg: "Failed to convert column schemas into table schema", })? + .timestamp_index(table_schema.timestamp_index()) .version(table_schema.version() + 1); - if let Some(index) = table_schema.timestamp_index() { - builder = builder.timestamp_index(index); - } for (k, v) in table_schema.arrow_schema().metadata.iter() { builder = builder.add_metadata(k, v); } @@ -455,7 +453,7 @@ impl MitoTable { let _manifest_version = manifest .update(TableMetaActionList::with_action(TableMetaAction::Change( Box::new(TableChange { - table_info: table_info.clone(), + table_info: RawTableInfo::from(table_info.clone()), }), ))) .await @@ -515,10 +513,12 @@ impl MitoTable { for action in action_list.actions { match action { TableMetaAction::Change(c) => { - table_info = Some(c.table_info); + table_info = Some( + TableInfo::try_from(c.table_info).context(error::ConvertRawSnafu)?, + ); } TableMetaAction::Protocol(_) => {} - _ => unimplemented!(), + TableMetaAction::Remove(_) => unimplemented!("Drop table is unimplemented"), } } } diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index 2d57422cf0..dd4a3ac3bb 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -38,7 +38,7 @@ pub fn schema_for_test() -> Schema { SchemaBuilder::try_from(column_schemas) .unwrap() - .timestamp_index(3) + .timestamp_index(Some(3)) .build() .expect("ts must be timestamp column") } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index fd00114783..9212f6265a 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -2,7 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use chrono::{DateTime, Utc}; -use datatypes::schema::SchemaRef; +pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; +use datatypes::schema::{RawSchema, Schema, SchemaRef}; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use store_api::storage::ColumnId; @@ -38,13 +39,14 @@ pub enum TableType { Temporary, } +/// Identifier of the table. #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)] pub struct TableIdent { pub table_id: TableId, pub version: TableVersion, } -#[derive(Serialize, Deserialize, Clone, Debug, Builder, PartialEq)] +#[derive(Clone, Debug, Builder, PartialEq)] #[builder(pattern = "mutable")] pub struct TableMeta { pub schema: SchemaRef, @@ -54,8 +56,10 @@ pub struct TableMeta { #[builder(default, setter(into))] pub engine: String, pub next_column_id: ColumnId, + /// Options for table engine. #[builder(default)] pub engine_options: HashMap, + /// Table options. #[builder(default)] pub options: HashMap, #[builder(default = "Utc::now()")] @@ -92,13 +96,14 @@ impl TableMeta { } } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Builder)] +#[derive(Clone, Debug, PartialEq, Builder)] #[builder(pattern = "owned")] pub struct TableInfo { #[builder(default, setter(into))] pub ident: TableIdent, #[builder(setter(into))] pub name: String, + /// Comment of the table. #[builder(default, setter(into))] pub desc: Option, #[builder(default, setter(into))] @@ -121,15 +126,15 @@ impl TableInfoBuilder { } } - pub fn table_id(mut self, id: impl Into) -> Self { + pub fn table_id(mut self, id: TableId) -> Self { let ident = self.ident.get_or_insert_with(TableIdent::default); - ident.table_id = id.into(); + ident.table_id = id; self } - pub fn table_version(mut self, version: impl Into) -> Self { + pub fn table_version(mut self, version: TableVersion) -> Self { let ident = self.ident.get_or_insert_with(TableIdent::default); - ident.version = version.into(); + ident.version = version; self } } @@ -148,3 +153,136 @@ impl From for TableIdent { Self::new(table_id) } } + +/// Struct used to serialize and deserialize [`TableMeta`]. +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct RawTableMeta { + pub schema: RawSchema, + pub primary_key_indices: Vec, + pub value_indices: Vec, + pub engine: String, + pub next_column_id: ColumnId, + pub engine_options: HashMap, + pub options: HashMap, + pub created_on: DateTime, +} + +impl From for RawTableMeta { + fn from(meta: TableMeta) -> RawTableMeta { + RawTableMeta { + schema: RawSchema::from(&*meta.schema), + primary_key_indices: meta.primary_key_indices, + value_indices: meta.value_indices, + engine: meta.engine, + next_column_id: meta.next_column_id, + engine_options: meta.engine_options, + options: meta.options, + created_on: meta.created_on, + } + } +} + +impl TryFrom for TableMeta { + type Error = ConvertError; + + fn try_from(raw: RawTableMeta) -> ConvertResult { + Ok(TableMeta { + schema: Arc::new(Schema::try_from(raw.schema)?), + primary_key_indices: raw.primary_key_indices, + value_indices: raw.value_indices, + engine: raw.engine, + next_column_id: raw.next_column_id, + engine_options: raw.engine_options, + options: raw.options, + created_on: raw.created_on, + }) + } +} + +/// Struct used to serialize and deserialize [`TableInfo`]. +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct RawTableInfo { + pub ident: TableIdent, + pub name: String, + pub desc: Option, + pub catalog_name: String, + pub schema_name: String, + pub meta: RawTableMeta, + pub table_type: TableType, +} + +impl From for RawTableInfo { + fn from(info: TableInfo) -> RawTableInfo { + RawTableInfo { + ident: info.ident, + name: info.name, + desc: info.desc, + catalog_name: info.catalog_name, + schema_name: info.schema_name, + meta: RawTableMeta::from(info.meta), + table_type: info.table_type, + } + } +} + +impl TryFrom for TableInfo { + type Error = ConvertError; + + fn try_from(raw: RawTableInfo) -> ConvertResult { + Ok(TableInfo { + ident: raw.ident, + name: raw.name, + desc: raw.desc, + catalog_name: raw.catalog_name, + schema_name: raw.schema_name, + meta: TableMeta::try_from(raw.meta)?, + table_type: raw.table_type, + }) + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; + + use super::*; + + fn new_test_schema() -> Schema { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false), + ]; + SchemaBuilder::try_from(column_schemas) + .unwrap() + .timestamp_index(Some(1)) + .version(123) + .build() + .unwrap() + } + + #[test] + fn test_raw_convert() { + let schema = Arc::new(new_test_schema()); + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![1]) + .value_indices(vec![0]) + .engine("engine") + .next_column_id(2) + .build() + .unwrap(); + let info = TableInfoBuilder::default() + .table_id(10) + .table_version(5) + .name("mytable") + .meta(meta) + .build() + .unwrap(); + + let raw = RawTableInfo::from(info.clone()); + let info_new = TableInfo::try_from(raw).unwrap(); + + assert_eq!(info, info_new); + } +}