diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 6a053fc469..811628c1ff 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -19,7 +19,6 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::prelude::{Snafu, StatusCode}; use datafusion::error::DataFusionError; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::RawSchema; use snafu::{Backtrace, ErrorCompat}; use crate::DeregisterTableRequest; @@ -162,19 +161,6 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display( - "Invalid table schema in catalog entry, table:{}, schema: {:?}, source: {}", - table_info, - schema, - source - ))] - InvalidTableSchema { - table_info: String, - schema: RawSchema, - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failure during SchemaProvider operation, source: {}", source))] SchemaProviderOperation { #[snafu(backtrace)] @@ -254,8 +240,7 @@ impl ErrorExt for Error { Error::MetaSrv { source, .. } => source.status_code(), Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), - Error::InvalidTableSchema { source, .. } - | Error::InvalidTableInfoInCatalog { source } => source.status_code(), + Error::InvalidTableInfoInCatalog { source } => source.status_code(), Error::SchemaProviderOperation { source } | Error::Internal { source } => { source.status_code() } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index ce7d24b621..1c571159c8 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -32,8 +32,8 @@ use table::TableRef; use tokio::sync::Mutex; use crate::error::{ - CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu, - OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, + CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result, + SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, }; use crate::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, @@ -346,21 +346,13 @@ impl RemoteCatalogManager { ); let meta = &table_info.meta; - let schema = meta - .schema - .clone() - .try_into() - .context(InvalidTableSchemaSnafu { - table_info: format!("{catalog_name}.{schema_name}.{table_name}"), - schema: meta.schema.clone(), - })?; let req = CreateTableRequest { id: table_id, catalog_name: catalog_name.clone(), schema_name: schema_name.clone(), table_name: table_name.clone(), desc: None, - schema: Arc::new(schema), + schema: meta.schema.clone(), region_numbers: region_numbers.clone(), primary_key_indices: meta.primary_key_indices.clone(), create_if_not_exists: true, diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 2ac8540783..23542d6be0 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -26,7 +26,7 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef}; -use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{ColumnSchema, RawSchema, SchemaRef}; use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -88,7 +88,7 @@ impl SystemCatalogTable { table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, }; - let schema = Arc::new(build_system_catalog_schema()); + let schema = build_system_catalog_schema(); let ctx = EngineContext::default(); if let Some(table) = engine @@ -105,7 +105,7 @@ impl SystemCatalogTable { schema_name: INFORMATION_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), desc: Some("System catalog table".to_string()), - schema: schema.clone(), + schema, region_numbers: vec![0], primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX], create_if_not_exists: true, @@ -143,7 +143,7 @@ impl SystemCatalogTable { /// - value: JSON-encoded value of entry's metadata. /// - gmt_created: create time of this metadata. /// - gmt_modified: last updated time of this metadata. -fn build_system_catalog_schema() -> Schema { +fn build_system_catalog_schema() -> RawSchema { let cols = vec![ ColumnSchema::new( "entry_type".to_string(), @@ -178,8 +178,7 @@ fn build_system_catalog_schema() -> Schema { ), ]; - // The schema of this table must be valid. - SchemaBuilder::try_from(cols).unwrap().build().unwrap() + RawSchema::new(cols) } /// Formats key string for table entry in system catalog diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 7df4231116..32e8a49e56 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -28,7 +28,7 @@ mod tests { }; use catalog::{CatalogList, CatalogManager, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use datatypes::schema::Schema; + use datatypes::schema::RawSchema; use futures_util::StreamExt; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; @@ -116,7 +116,7 @@ mod tests { let schema_name = "nonexistent_schema".to_string(); let table_name = "fail_table".to_string(); // this schema has no effect - let table_schema = Arc::new(Schema::new(vec![])); + let table_schema = RawSchema::new(vec![]); let table = table_engine .create_table( &EngineContext {}, @@ -126,7 +126,7 @@ mod tests { schema_name: schema_name.clone(), table_name: table_name.clone(), desc: None, - schema: table_schema.clone(), + schema: table_schema, region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: false, @@ -176,7 +176,7 @@ mod tests { let table_name = "test_table".to_string(); let table_id = 1; // this schema has no effect - let table_schema = Arc::new(Schema::new(vec![])); + let table_schema = RawSchema::new(vec![]); let table = table_engine .create_table( &EngineContext {}, @@ -186,7 +186,7 @@ mod tests { schema_name: schema_name.clone(), table_name: table_name.clone(), desc: None, - schema: table_schema.clone(), + schema: table_schema, region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: false, @@ -246,7 +246,7 @@ mod tests { schema_name: schema_name.clone(), table_name: "".to_string(), desc: None, - schema: Arc::new(Schema::new(vec![])), + schema: RawSchema::new(vec![]), region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: false, diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index 5c450253d5..1ebc38bf0d 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use api::v1::alter_expr::Kind; use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{ColumnSchema, RawSchema}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest}; use crate::error::{ - ColumnNotFoundSnafu, CreateSchemaSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, - MissingTimestampColumnSnafu, Result, + ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, + Result, }; /// Convert an [`AlterExpr`] to an [`AlterTableRequest`] @@ -92,7 +90,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result { } } -pub fn create_table_schema(expr: &CreateTableExpr) -> Result { +pub fn create_table_schema(expr: &CreateTableExpr) -> Result { let column_schemas = expr .column_defs .iter() @@ -121,12 +119,7 @@ pub fn create_table_schema(expr: &CreateTableExpr) -> Result { }) .collect::>(); - Ok(Arc::new( - SchemaBuilder::try_from(column_schemas) - .context(CreateSchemaSnafu)? - .build() - .context(CreateSchemaSnafu)?, - )) + Ok(RawSchema::new(column_schemas)) } pub fn create_expr_to_request( @@ -138,8 +131,11 @@ pub fn create_expr_to_request( .primary_keys .iter() .map(|key| { + // We do a linear search here. schema - .column_index_by_name(key) + .column_schemas + .iter() + .position(|column_schema| column_schema.name == *key) .context(ColumnNotFoundSnafu { column_name: key, table_name: &expr.table_name, diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index 5dbf9a2c12..51a5be55a2 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -40,12 +40,6 @@ pub enum Error { source: api::error::Error, }, - #[snafu(display("Failed to create schema when creating table, source: {}", source))] - CreateSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display( "Duplicated timestamp column in gRPC requests, exists {}, duplicated: {}", exists, @@ -102,9 +96,9 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } Error::ColumnDataType { .. } => StatusCode::Internal, - Error::CreateSchema { .. } - | Error::DuplicatedTimestampColumn { .. } - | Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments, + Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => { + StatusCode::InvalidArguments + } Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments, Error::CreateVector { .. } => StatusCode::InvalidArguments, Error::MissingField { .. } => StatusCode::InvalidArguments, diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index e605af151f..3357bc2075 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -522,7 +522,7 @@ mod test { use catalog::{CatalogList, CatalogProvider, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datafusion::common::{DFSchema, ToDFSchema}; - use datatypes::schema::Schema; + use datatypes::schema::RawSchema; use table::requests::CreateTableRequest; use table::test_util::{EmptyTable, MockTableEngine}; @@ -558,7 +558,7 @@ mod test { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), desc: None, - schema: Arc::new(Schema::new(supported_types())), + schema: RawSchema::new(supported_types()), region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: true, diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index c38c6982f2..208d9eca25 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -177,12 +177,6 @@ pub enum Error { #[snafu(display("Not support SQL, error: {}", msg))] NotSupportSql { msg: String }, - #[snafu(display("Failed to create schema when creating table, source: {}", source))] - CreateSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failed to convert datafusion schema, source: {}", source))] ConvertSchema { #[snafu(backtrace)] @@ -370,9 +364,9 @@ impl ErrorExt for Error { | Error::CreateExprToRequest { source } | Error::InsertData { source } => source.status_code(), - Error::CreateSchema { source, .. } - | Error::ConvertSchema { source, .. } - | Error::VectorComputation { source } => source.status_code(), + Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => { + source.status_code() + } Error::ColumnValuesNumberMismatch { .. } | Error::InvalidSql { .. } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 053b673052..c541912de6 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -86,13 +86,11 @@ impl Instance { #[cfg(test)] mod tests { - use std::sync::Arc; - use api::v1::{column_def, ColumnDataType, ColumnDef, TableId}; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc_expr::create_table_schema; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; use datatypes::value::Value; use super::*; @@ -224,7 +222,7 @@ mod tests { } } - fn expected_table_schema() -> SchemaRef { + fn expected_table_schema() -> RawSchema { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), ColumnSchema::new( @@ -236,11 +234,7 @@ mod tests { ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), ]; - Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .unwrap(), - ) + + RawSchema::new(column_schemas) } } diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index a983d42f3c..299aa19606 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -13,13 +13,12 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use catalog::{RegisterSchemaRequest, RegisterTableRequest}; use common_query::Output; use common_telemetry::tracing::info; use common_telemetry::tracing::log::error; -use datatypes::schema::SchemaBuilder; +use datatypes::schema::RawSchema; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{ColumnOption, TableConstraint}; @@ -31,8 +30,8 @@ use table::metadata::TableId; use table::requests::*; use crate::error::{ - self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateSchemaSnafu, - CreateTableSnafu, IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, + self, CatalogNotFoundSnafu, CatalogSnafu, ConstraintNotSupportedSnafu, CreateTableSnafu, + IllegalPrimaryKeysDefSnafu, InsertSystemCatalogSnafu, KeyColumnNotFoundSnafu, RegisterSchemaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, }; use crate::sql::SqlHandler; @@ -239,13 +238,7 @@ impl SqlHandler { }) .collect::>>()?; - let schema = Arc::new( - SchemaBuilder::try_from(columns_schemas) - .context(CreateSchemaSnafu)? - .build() - .context(CreateSchemaSnafu)?, - ); - + let schema = RawSchema::new(columns_schemas); let request = CreateTableRequest { id: table_id, catalog_name: table_ref.catalog.to_string(), @@ -267,6 +260,7 @@ mod tests { use std::assert_matches::assert_matches; use datatypes::prelude::ConcreteDataType; + use datatypes::schema::Schema; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; @@ -320,8 +314,8 @@ mod tests { assert_eq!(42, c.id); assert!(!c.create_if_not_exists); assert_eq!(vec![0], c.primary_key_indices); - assert_eq!(1, c.schema.timestamp_index().unwrap()); - assert_eq!(4, c.schema.column_schemas().len()); + assert_eq!(1, c.schema.timestamp_index.unwrap()); + assert_eq!(4, c.schema.column_schemas.len()); } #[tokio::test] @@ -371,7 +365,7 @@ mod tests { .create_to_request(42, parsed_stmt, &TableReference::bare("demo_table")) .unwrap(); assert!(c.primary_key_indices.is_empty()); - assert_eq!(c.schema.timestamp_index(), Some(1)); + assert_eq!(c.schema.timestamp_index, Some(1)); } /// Constraints specified, not column cannot be found. @@ -438,40 +432,25 @@ mod tests { assert_eq!("s".to_string(), request.schema_name); assert_eq!("demo".to_string(), request.table_name); assert!(!request.create_if_not_exists); - assert_eq!(4, request.schema.column_schemas().len()); + assert_eq!(4, request.schema.column_schemas.len()); assert_eq!(vec![0], request.primary_key_indices); + let schema = Schema::try_from(request.schema).unwrap(); assert_eq!( ConcreteDataType::string_datatype(), - request - .schema - .column_schema_by_name("host") - .unwrap() - .data_type + schema.column_schema_by_name("host").unwrap().data_type ); assert_eq!( ConcreteDataType::timestamp_millisecond_datatype(), - request - .schema - .column_schema_by_name("ts") - .unwrap() - .data_type + schema.column_schema_by_name("ts").unwrap().data_type ); assert_eq!( ConcreteDataType::float64_datatype(), - request - .schema - .column_schema_by_name("cpu") - .unwrap() - .data_type + schema.column_schema_by_name("cpu").unwrap().data_type ); assert_eq!( ConcreteDataType::float64_datatype(), - request - .schema - .column_schema_by_name("memory") - .unwrap() - .data_type + schema.column_schema_by_name("memory").unwrap().data_type ); } } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index fef0e5df09..5365ad103c 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -19,7 +19,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER use common_query::Output; use common_recordbatch::util; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use datatypes::schema::{ColumnSchema, RawSchema}; use mito::config::EngineConfig; use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use query::QueryEngineFactory; @@ -104,12 +104,7 @@ pub(crate) async fn create_test_table( schema_name: "public".to_string(), table_name: table_name.to_string(), desc: Some(" a test table".to_string()), - schema: Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .expect("ts is expected to be timestamp column"), - ), + schema: RawSchema::new(column_schemas), create_if_not_exists: true, primary_key_indices: vec![0], // "host" is in primary keys table_options: HashMap::new(), diff --git a/src/datatypes/src/schema/raw.rs b/src/datatypes/src/schema/raw.rs index ab94e9ad8f..d952bb9f3c 100644 --- a/src/datatypes/src/schema/raw.rs +++ b/src/datatypes/src/schema/raw.rs @@ -22,15 +22,38 @@ use crate::schema::{ColumnSchema, Schema, SchemaBuilder}; /// This struct only contains necessary data to recover the Schema. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RawSchema { + /// Schema of columns. pub column_schemas: Vec, + /// Index of the timestamp column. pub timestamp_index: Option, + /// Schema version. pub version: u32, } +impl RawSchema { + /// Creates a new [RawSchema] from specific `column_schemas`. + /// + /// Sets [RawSchema::timestamp_index] to the first index of the timestamp + /// column. It doesn't check whether time index column is duplicate. + pub fn new(column_schemas: Vec) -> RawSchema { + let timestamp_index = column_schemas + .iter() + .position(|column_schema| column_schema.is_time_index()); + + RawSchema { + column_schemas, + timestamp_index, + version: 0, + } + } +} + impl TryFrom for Schema { type Error = Error; fn try_from(raw: RawSchema) -> Result { + // While building Schema, we don't trust the fields, such as timestamp_index, + // in RawSchema. We use SchemaBuilder to perform the validation. SchemaBuilder::try_from(raw.column_schemas)? .version(raw.version) .build() @@ -74,4 +97,33 @@ mod tests { assert_eq!(schema, schema_new); } + + #[test] + fn test_new_raw_schema_with_time_index() { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ]; + let schema = RawSchema::new(column_schemas); + assert_eq!(1, schema.timestamp_index.unwrap()); + } + + #[test] + fn test_new_raw_schema_without_time_index() { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ]; + let schema = RawSchema::new(column_schemas); + assert!(schema.timestamp_index.is_none()); + } } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 73cb27a17e..91278eec6c 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_telemetry::tracing::log::info; use common_telemetry::{debug, logging}; -use datatypes::schema::SchemaRef; +use datatypes::schema::{Schema, SchemaRef}; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ @@ -42,8 +42,8 @@ use tokio::sync::Mutex; use crate::config::EngineConfig; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, - BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu, - RegionNotFoundSnafu, Result, TableExistsSnafu, + BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu, + MissingTimestampIndexSnafu, RegionNotFoundSnafu, Result, TableExistsSnafu, }; use crate::manifest::TableManifest; use crate::table::MitoTable; @@ -274,7 +274,7 @@ fn build_column_family( fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> { let ts_index = request .schema - .timestamp_index() + .timestamp_index .context(MissingTimestampIndexSnafu { table_name: &request.table_name, })?; @@ -320,18 +320,19 @@ impl MitoEngineInner { } } - let table_schema = &request.schema; + let table_schema = + Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?); let primary_key_indices = &request.primary_key_indices; let (next_column_id, default_cf) = build_column_family( INIT_COLUMN_ID, table_name, - table_schema, + &table_schema, primary_key_indices, )?; let (next_column_id, row_key) = build_row_key_desc( next_column_id, table_name, - table_schema, + &table_schema, primary_key_indices, )?; @@ -378,7 +379,7 @@ impl MitoEngineInner { } let table_meta = TableMetaBuilder::default() - .schema(request.schema) + .schema(table_schema) .engine(MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(request.primary_key_indices.clone()) @@ -599,7 +600,7 @@ mod tests { use common_query::physical_plan::SessionContext; use common_recordbatch::util; use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; use datatypes::value::Value; use datatypes::vectors::{ Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef, @@ -635,12 +636,7 @@ mod tests { .with_time_index(true), ]; - let schema = Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .expect("ts must be timestamp column"), - ); + let schema = RawSchema::new(column_schemas); let (dir, object_store) = test_util::new_test_object_store("test_insert_with_column_default_constraint").await; @@ -665,7 +661,7 @@ mod tests { schema_name: "public".to_string(), table_name: table_name.to_string(), desc: Some("a test table".to_string()), - schema: schema.clone(), + schema, create_if_not_exists: true, primary_key_indices: Vec::default(), table_options: HashMap::new(), @@ -770,12 +766,7 @@ mod tests { .with_time_index(true), ]; - let schema = Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .expect("ts must be timestamp column"), - ); + let schema = RawSchema::new(column_schemas); let mut request = CreateTableRequest { id: 1, @@ -944,7 +935,7 @@ mod tests { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), table_name: table_info.name.to_string(), - schema: table_info.meta.schema.clone(), + schema: RawSchema::from(&*table_info.meta.schema), create_if_not_exists: true, desc: None, primary_key_indices: Vec::default(), @@ -961,7 +952,7 @@ mod tests { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), table_name: table_info.name.to_string(), - schema: table_info.meta.schema.clone(), + schema: RawSchema::from(&*table_info.meta.schema), create_if_not_exists: false, desc: None, primary_key_indices: Vec::default(), @@ -1170,7 +1161,7 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: another_name.to_string(), desc: Some("another test table".to_string()), - schema: Arc::new(schema_for_test()), + schema: RawSchema::from(&schema_for_test()), region_numbers: vec![0], primary_key_indices: vec![0], create_if_not_exists: true, @@ -1253,7 +1244,7 @@ mod tests { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_info.name.to_string(), - schema: table_info.meta.schema.clone(), + schema: RawSchema::from(&*table_info.meta.schema), create_if_not_exists: true, desc: None, primary_key_indices: Vec::default(), @@ -1286,7 +1277,7 @@ mod tests { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_info.name.to_string(), - schema: table_info.meta.schema.clone(), + schema: RawSchema::from(&*table_info.meta.schema), create_if_not_exists: false, desc: None, primary_key_indices: Vec::default(), diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index ee1aa15265..03b7266cec 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -184,6 +184,9 @@ pub enum Error { region_name: String, backtrace: Backtrace, }, + + #[snafu(display("Invalid schema, source: {}", source))] + InvalidRawSchema { source: datatypes::error::Error }, } pub type Result = std::result::Result; @@ -207,7 +210,8 @@ impl ErrorExt for Error { | ProjectedColumnNotFound { .. } | InvalidPrimaryKey { .. } | MissingTimestampIndex { .. } - | TableNotFound { .. } => StatusCode::InvalidArguments, + | TableNotFound { .. } + | InvalidRawSchema { .. } => StatusCode::InvalidArguments, TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected, diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 4b4681f15c..3a31f71965 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use datatypes::vectors::VectorRef; use log_store::NoopLogStore; use object_store::services::fs::Builder; @@ -109,7 +109,7 @@ fn new_create_request(schema: SchemaRef) -> CreateTableRequest { schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), desc: Some("a test table".to_string()), - schema, + schema: RawSchema::from(&*schema), region_numbers: vec![0], create_if_not_exists: true, primary_key_indices: vec![0], diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 7fe267eeab..0f66fc1a23 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -24,7 +24,7 @@ use common_recordbatch::util as record_util; use common_telemetry::logging; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector}; -use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; +use datatypes::schema::{ColumnSchema, RawSchema}; use datatypes::vectors::{StringVector, TimestampMillisecondVector, Vector, VectorRef}; use query::parser::QueryLanguageParser; use query::QueryEngineRef; @@ -50,7 +50,7 @@ impl ScriptsTable { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, ) -> Result { - let schema = Arc::new(build_scripts_schema()); + let schema = build_scripts_schema(); // TODO(dennis): we put scripts table into default catalog and schema. // maybe put into system catalog? let request = CreateTableRequest { @@ -202,7 +202,7 @@ impl ScriptsTable { } /// Build scripts table -fn build_scripts_schema() -> Schema { +fn build_scripts_schema() -> RawSchema { let cols = vec![ ColumnSchema::new( "schema".to_string(), @@ -242,6 +242,5 @@ fn build_scripts_schema() -> Schema { ), ]; - // Schema is always valid here - SchemaBuilder::try_from(cols).unwrap().build().unwrap() + RawSchema::new(cols) } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 27ff4375ed..f505f65c63 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use datatypes::prelude::VectorRef; -use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::schema::{ColumnSchema, RawSchema}; use store_api::storage::RegionNumber; use crate::metadata::TableId; @@ -45,7 +45,7 @@ pub struct CreateTableRequest { pub schema_name: String, pub table_name: String, pub desc: Option, - pub schema: SchemaRef, + pub schema: RawSchema, pub region_numbers: Vec, pub primary_key_indices: Vec, pub create_if_not_exists: bool, diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index c0dfcb5821..7fd8168235 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -29,8 +29,9 @@ pub struct EmptyTable { impl EmptyTable { pub fn new(req: CreateTableRequest) -> Self { + let schema = Arc::new(req.schema.try_into().unwrap()); let table_meta = TableMetaBuilder::default() - .schema(req.schema) + .schema(schema) .primary_key_indices(req.primary_key_indices) .next_column_id(0) .options(req.table_options) diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 069a87b589..4679373cf9 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -30,7 +30,7 @@ use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, SchemaBuilder}; +use datatypes::schema::{ColumnSchema, RawSchema}; use frontend::instance::Instance as FeInstance; use object_store::backend::s3; use object_store::services::oss; @@ -236,12 +236,7 @@ pub async fn create_test_table( schema_name: "public".to_string(), table_name: table_name.to_string(), desc: Some(" a test table".to_string()), - schema: Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .expect("ts is expected to be timestamp column"), - ), + schema: RawSchema::new(column_schemas), create_if_not_exists: true, primary_key_indices: vec![0], // "host" is in primary keys table_options: HashMap::new(),