diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index f781423bf4..e209c626a6 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -344,6 +344,9 @@ pub enum Error { source: client::Error, }, + #[snafu(display("Cannot find primary key column by name: {}", msg))] + PrimaryKeyNotFound { msg: String, backtrace: Backtrace }, + #[snafu(display("Failed to execute sql: {}, source: {}", sql, source))] ExecuteSql { sql: String, @@ -422,7 +425,7 @@ impl ErrorExt for Error { Error::Select { source, .. } => source.status_code(), Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), Error::DeserializeInsertBatch { source, .. } => source.status_code(), - + Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, Error::ExecuteSql { source, .. } => source.status_code(), } } diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs new file mode 100644 index 0000000000..c3a3ef7a8c --- /dev/null +++ b/src/frontend/src/expr_factory.rs @@ -0,0 +1,173 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use api::helper::ColumnDataTypeWrapper; +use api::v1::codec::InsertBatch; +use api::v1::{ColumnDataType, CreateExpr}; +use datatypes::schema::ColumnSchema; +use snafu::{ensure, ResultExt}; +use sql::statements::create::{CreateTable, TIME_INDEX}; +use sql::statements::{column_def_to_schema, table_idents_to_full_name}; +use sqlparser::ast::{ColumnDef, TableConstraint}; + +use crate::error::InvalidSqlSnafu; +use crate::error::Result; +use crate::error::{ + BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, + ParseSqlSnafu, +}; + +pub type CreateExprFactoryRef = Arc; + +#[async_trait::async_trait] +pub trait CreateExprFactory { + async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result; + + async fn create_expr_by_insert_batch( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + batch: &[InsertBatch], + ) -> crate::error::Result; +} + +#[derive(Debug)] +pub struct DefaultCreateExprFactory; + +#[async_trait::async_trait] +impl CreateExprFactory for DefaultCreateExprFactory { + async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result { + create_to_expr(None, vec![0], stmt) + } + + async fn create_expr_by_insert_batch( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + batch: &[InsertBatch], + ) -> Result { + let table_id = None; + let create_expr = common_insert::build_create_expr_from_insertion( + catalog_name, + schema_name, + table_id, + table_name, + batch, + ) + .context(BuildCreateExprOnInsertionSnafu)?; + + Ok(create_expr) + } +} + +/// Convert `CreateTable` statement to `CreateExpr` gRPC request. +fn create_to_expr( + table_id: Option, + region_ids: Vec, + create: &CreateTable, +) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?; + + let time_index = find_time_index(&create.constraints)?; + let expr = CreateExpr { + catalog_name: Some(catalog_name), + schema_name: Some(schema_name), + table_name, + desc: None, + column_defs: columns_to_expr(&create.columns, &time_index)?, + time_index, + primary_keys: find_primary_keys(&create.constraints)?, + create_if_not_exists: create.if_not_exists, + // TODO(LFC): Fill in other table options. + table_options: HashMap::from([("engine".to_string(), create.engine.clone())]), + table_id, + region_ids, + }; + Ok(expr) +} + +fn find_primary_keys(constraints: &[TableConstraint]) -> Result> { + let primary_keys = constraints + .iter() + .filter_map(|constraint| match constraint { + TableConstraint::Unique { + name: _, + columns, + is_primary: true, + } => Some(columns.iter().map(|ident| ident.value.clone())), + _ => None, + }) + .flatten() + .collect::>(); + Ok(primary_keys) +} + +pub fn find_time_index(constraints: &[TableConstraint]) -> crate::error::Result { + let time_index = constraints + .iter() + .filter_map(|constraint| match constraint { + TableConstraint::Unique { + name: Some(name), + columns, + is_primary: false, + } => { + if name.value == TIME_INDEX { + Some(columns.iter().map(|ident| &ident.value)) + } else { + None + } + } + _ => None, + }) + .flatten() + .collect::>(); + ensure!( + time_index.len() == 1, + InvalidSqlSnafu { + err_msg: "must have one and only one TimeIndex columns", + } + ); + Ok(time_index.first().unwrap().to_string()) +} + +fn columns_to_expr( + column_defs: &[ColumnDef], + time_index: &str, +) -> crate::error::Result> { + let column_schemas = column_defs + .iter() + .map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu)) + .collect::>>()?; + + let column_datatypes = column_schemas + .iter() + .map(|c| { + ColumnDataTypeWrapper::try_from(c.data_type.clone()) + .map(|w| w.datatype()) + .context(ColumnDataTypeSnafu) + }) + .collect::>>()?; + + column_schemas + .iter() + .zip(column_datatypes.into_iter()) + .map(|(schema, datatype)| { + Ok(api::v1::ColumnDef { + name: schema.name.clone(), + datatype: datatype as i32, + is_nullable: schema.is_nullable(), + default_constraint: match schema.default_constraint() { + None => None, + Some(v) => Some(v.clone().try_into().context( + ConvertColumnDefaultConstraintSnafu { + column_name: &schema.name, + }, + )?), + }, + }) + }) + .collect() +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 71c9fcf3da..6310faa906 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -7,15 +7,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use api::helper::ColumnDataTypeWrapper; use api::result::ObjectResultBuilder; use api::v1::alter_expr::Kind; use api::v1::codec::InsertBatch; use api::v1::object_expr::Expr; use api::v1::{ - insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, ColumnDataType, - ColumnDef as GrpcColumnDef, CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, - ObjectResult as GrpcObjectResult, + insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, + InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; @@ -26,7 +24,6 @@ use common_error::prelude::{BoxedError, StatusCode}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; use common_telemetry::{debug, error, info}; -use datatypes::schema::ColumnSchema; use distributed::DistInstance; use meta_client::client::MetaClientBuilder; use meta_client::MetaClientOpts; @@ -36,22 +33,19 @@ use servers::query_handler::{ PrometheusProtocolHandler, SqlQueryHandler, }; use snafu::prelude::*; -use sql::ast::{ColumnDef, TableConstraint}; -use sql::statements::create::{CreateTable, TIME_INDEX}; +use sql::statements::create::Partitions; use sql::statements::insert::Insert; use sql::statements::statement::Statement; -use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use sql::{dialect::GenericDialect, parser::ParserContext}; -use table::table::TableIdProviderRef; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, AlterTableOnInsertionSnafu, AlterTableSnafu, BuildCreateExprOnInsertionSnafu, - BumpTableIdSnafu, CatalogNotFoundSnafu, CatalogSnafu, ConvertColumnDefaultConstraintSnafu, - CreateTableOnInsertionSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu, - FindNewColumnsOnInsertionSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, SelectSnafu, + self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu, + CreateTableSnafu, DeserializeInsertBatchSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, + Result, SchemaNotFoundSnafu, SelectSnafu, }; +use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::{FrontendOptions, Mode}; use crate::sql::insert_to_request; use crate::table::route::TableRoutes; @@ -80,9 +74,7 @@ pub struct Instance { client: Client, /// catalog manager is None in standalone mode, datanode will keep their own catalog_manager: Option, - /// Table id provider, in standalone mode is left to None, but in distributed mode, - /// table id should be generated by metasrv. - table_id_provider: Option, + create_expr_factory: CreateExprFactoryRef, // TODO(fys): it should be a trait that corresponds to two implementations: // Standalone and Distributed, then the code behind it doesn't need to use so // many match statements. @@ -96,7 +88,7 @@ impl Default for Instance { Self { client: Client::default(), catalog_manager: None, - table_id_provider: None, + create_expr_factory: Arc::new(DefaultCreateExprFactory {}), mode: Mode::Standalone, dist_instance: None, } @@ -189,42 +181,24 @@ impl Instance { } } - /// Convert `CreateTable` statement to `CreateExpr` gRPC request. - fn create_to_expr( - table_id: Option, - region_ids: Vec, - create: &CreateTable, - ) -> Result { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?; - - let time_index = find_time_index(&create.constraints)?; - let expr = CreateExpr { - catalog_name: Some(catalog_name), - schema_name: Some(schema_name), - table_name, - desc: None, - column_defs: columns_to_expr(&create.columns, &time_index)?, - time_index, - primary_keys: find_primary_keys(&create.constraints)?, - create_if_not_exists: create.if_not_exists, - // TODO(LFC): Fill in other table options. - table_options: HashMap::from([("engine".to_string(), create.engine.clone())]), - table_id, - region_ids, - }; - Ok(expr) - } - /// Handle create expr. - pub async fn handle_create_table(&self, expr: CreateExpr) -> Result { - let result = self.admin().create(expr.clone()).await; - if let Err(e) = &result { - error!(e; "Failed to create table by expr: {:?}", expr); + pub async fn handle_create_table( + &self, + mut expr: CreateExpr, + partitions: Option, + ) -> Result { + if let Some(v) = &self.dist_instance { + v.create_table(&mut expr, partitions).await + } else { + // Currently standalone mode does not support multi partitions/regions. + let result = self.admin().create(expr.clone()).await; + if let Err(e) = &result { + error!(e; "Failed to create table by expr: {:?}", expr); + } + result + .and_then(admin_result_to_output) + .context(CreateTableSnafu) } - result - .and_then(admin_result_to_output) - .context(CreateTableSnafu) } /// Handle create database expr. @@ -397,29 +371,17 @@ impl Instance { insert_batches: &[InsertBatch], ) -> Result { // Create table automatically, build schema from data. - let table_id = match &self.table_id_provider { - Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?), - None => None, - }; - - let create_expr = common_insert::build_create_expr_from_insertion( - catalog_name, - schema_name, - table_id, - table_name, - insert_batches, - ) - .context(BuildCreateExprOnInsertionSnafu)?; + let create_expr = self + .create_expr_factory + .create_expr_by_insert_batch(catalog_name, schema_name, table_name, insert_batches) + .await?; info!( "Try to create table: {} automatically with request: {:?}", table_name, create_expr, ); - self.admin() - .create(create_expr) - .await - .and_then(admin_result_to_output) - .context(CreateTableOnInsertionSnafu) + // Create-on-insert does support partition by other columns now + self.handle_create_table(create_expr, None).await } async fn add_new_columns_to_table( @@ -501,7 +463,7 @@ impl Instance { Self { client, catalog_manager: Some(catalog), - table_id_provider: None, + create_expr_factory: Arc::new(DefaultCreateExprFactory), mode: Mode::Standalone, dist_instance: None, } @@ -564,32 +526,17 @@ impl SqlQueryHandler for Instance { } }, Statement::CreateTable(create) => { - if let Some(dist_instance) = &self.dist_instance { - dist_instance - .create_table(&create) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) - } else { - let table_id = match &self.table_id_provider { - Some(provider) => Some( - provider - .next_table_id() - .await - .context(BumpTableIdSnafu) - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?, - ), - None => None, - }; - let expr = Self::create_to_expr(table_id, vec![0], &create) - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?; - self.handle_create_table(expr) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) - } + let create_expr = self + .create_expr_factory + .create_expr_by_stmt(&create) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + + self.handle_create_table(create_expr, create.partitions) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) } Statement::ShowDatabases(_) | Statement::ShowTables(_) => self @@ -639,88 +586,6 @@ impl SqlQueryHandler for Instance { } } -fn find_primary_keys(constraints: &[TableConstraint]) -> Result> { - let primary_keys = constraints - .iter() - .filter_map(|constraint| match constraint { - TableConstraint::Unique { - name: _, - columns, - is_primary: true, - } => Some(columns.iter().map(|ident| ident.value.clone())), - _ => None, - }) - .flatten() - .collect::>(); - Ok(primary_keys) -} - -fn find_time_index(constraints: &[TableConstraint]) -> Result { - let time_index = constraints - .iter() - .filter_map(|constraint| match constraint { - TableConstraint::Unique { - name: Some(name), - columns, - is_primary: false, - } => { - if name.value == TIME_INDEX { - Some(columns.iter().map(|ident| &ident.value)) - } else { - None - } - } - _ => None, - }) - .flatten() - .collect::>(); - ensure!( - time_index.len() == 1, - error::InvalidSqlSnafu { - err_msg: "must have one and only one TimeIndex columns", - } - ); - Ok(time_index.first().unwrap().to_string()) -} - -fn columns_to_expr(column_defs: &[ColumnDef], time_index: &str) -> Result> { - let column_schemas = column_defs - .iter() - .map(|c| { - column_def_to_schema(c, c.name.to_string() == time_index).context(error::ParseSqlSnafu) - }) - .collect::>>()?; - - let column_datatypes = column_schemas - .iter() - .map(|c| { - ColumnDataTypeWrapper::try_from(c.data_type.clone()) - .map(|w| w.datatype()) - .context(error::ColumnDataTypeSnafu) - }) - .collect::>>()?; - - column_schemas - .iter() - .zip(column_datatypes.into_iter()) - .map(|(schema, datatype)| { - Ok(GrpcColumnDef { - name: schema.name.clone(), - datatype: datatype as i32, - is_nullable: schema.is_nullable(), - default_constraint: match schema.default_constraint() { - None => None, - Some(v) => Some(v.clone().try_into().context( - ConvertColumnDefaultConstraintSnafu { - column_name: &schema.name, - }, - )?), - }, - }) - }) - .collect() -} - #[async_trait] impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> server_error::Result { @@ -785,7 +650,8 @@ mod tests { use api::v1::codec::{InsertBatch, SelectResult}; use api::v1::{ admin_expr, admin_result, column, column::SemanticType, object_expr, object_result, - select_expr, Column, ExprHeader, MutateResult, SelectExpr, + select_expr, Column, ColumnDataType, ColumnDef as GrpcColumnDef, ExprHeader, MutateResult, + SelectExpr, }; use datatypes::schema::ColumnDefaultConstraint; use datatypes::value::Value; diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index cfd46c4184..4a6daff190 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -1,13 +1,17 @@ use std::collections::HashMap; use std::sync::Arc; +use api::helper::ColumnDataTypeWrapper; +use api::v1::CreateExpr; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; use client::Select; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::{TableGlobalKey, TableGlobalValue}; use common_query::Output; use common_telemetry::debug; -use datatypes::schema::RawSchema; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema}; use meta_client::client::MetaClient; use meta_client::rpc::{ CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName, @@ -15,19 +19,16 @@ use meta_client::rpc::{ }; use query::{QueryEngineFactory, QueryEngineRef}; use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::create::CreateTable; -use sql::statements::{ - column_def_to_schema, sql_data_type_to_concrete_data_type, sql_value_to_value, - table_idents_to_full_name, -}; -use sqlparser::ast::ColumnDef; +use sql::statements::create::Partitions; +use sql::statements::sql_value_to_value; use sqlparser::ast::Value as SqlValue; use table::metadata::RawTableMeta; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{self, Result}; -use crate::instance::{find_primary_keys, find_time_index, Instance}; +use crate::error::{ + self, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, PrimaryKeyNotFoundSnafu, Result, +}; use crate::partitioning::{PartitionBound, PartitionDef}; #[derive(Clone)] @@ -53,14 +54,17 @@ impl DistInstance { } } - pub(crate) async fn create_table(&self, create_table: &CreateTable) -> Result { - let response = self.create_table_in_meta(create_table).await?; - + pub(crate) async fn create_table( + &self, + create_table: &mut CreateExpr, + partitions: Option, + ) -> Result { + let response = self.create_table_in_meta(create_table, partitions).await?; let table_routes = response.table_routes; ensure!( table_routes.len() == 1, error::FindTableRoutesSnafu { - table_name: create_table.name.to_string() + table_name: create_table.table_name.to_string() } ); let table_route = table_routes.first().unwrap(); @@ -69,10 +73,10 @@ impl DistInstance { ensure!( !region_routes.is_empty(), error::FindRegionRoutesSnafu { - table_name: create_table.name.to_string() + table_name: create_table.table_name.to_string() } ); - + create_table.table_id = Some(table_route.table.id as u32); self.put_table_global_meta(create_table, table_route) .await?; @@ -81,18 +85,16 @@ impl DistInstance { let client = Admin::new("greptime", client); let regions = table_route.find_leader_regions(&datanode); - let create_expr = Instance::create_to_expr( - Some(table_route.table.id as u32), - regions.clone(), - create_table, - )?; + let mut create_expr_for_region = create_table.clone(); + create_expr_for_region.region_ids = regions; + debug!( "Creating table {:?} on Datanode {:?} with regions {:?}", - create_table, datanode, regions, + create_table, datanode, create_expr_for_region.region_ids, ); client - .create(create_expr) + .create(create_expr_for_region) .await .and_then(admin_result_to_output) .context(error::InvalidAdminResultSnafu)?; @@ -113,13 +115,24 @@ impl DistInstance { .context(error::ExecuteSqlSnafu { sql }) } - async fn create_table_in_meta(&self, create_table: &CreateTable) -> Result { - let (catalog, schema, table) = - table_idents_to_full_name(&create_table.name).context(error::ParseSqlSnafu)?; - let table_name = TableName::new(catalog, schema, table); - - let partitions = parse_partitions(create_table)?; + async fn create_table_in_meta( + &self, + create_table: &CreateExpr, + partitions: Option, + ) -> Result { + let table_name = TableName::new( + create_table + .catalog_name + .clone() + .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()), + create_table + .schema_name + .clone() + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()), + create_table.table_name.clone(), + ); + let partitions = parse_partitions(create_table, partitions)?; let request = MetaCreateRequest { table_name, partitions, @@ -133,7 +146,7 @@ impl DistInstance { // TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method? async fn put_table_global_meta( &self, - create_table: &CreateTable, + create_table: &CreateExpr, table_route: &TableRoute, ) -> Result<()> { let table_name = &table_route.table.table_name; @@ -156,7 +169,7 @@ impl DistInstance { } fn create_table_global_value( - create_table: &CreateTable, + create_table: &CreateExpr, table_route: &TableRoute, ) -> Result { let table_name = &table_route.table.table_name; @@ -171,43 +184,40 @@ fn create_table_global_value( })? .id; - let mut column_schemas = Vec::with_capacity(create_table.columns.len()); - let time_index = find_time_index(&create_table.constraints)?; - for column in create_table.columns.iter() { - column_schemas.push( - column_def_to_schema(column, column.name.value == time_index) - .context(error::ParseSqlSnafu)?, - ); + let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); + let mut column_name_to_index_map = HashMap::new(); + + for (idx, column) in create_table.column_defs.iter().enumerate() { + column_schemas.push(create_column_schema(column)?); + column_name_to_index_map.insert(column.name.clone(), idx); } - let timestamp_index = column_schemas.iter().enumerate().find_map(|(i, c)| { - if c.name == time_index { - Some(i) - } else { - None - } - }); + + let timestamp_index = column_name_to_index_map + .get(&create_table.time_index) + .cloned(); + let raw_schema = RawSchema { column_schemas: column_schemas.clone(), timestamp_index, version: 0, }; - let primary_key_indices = find_primary_keys(&create_table.constraints)? + let primary_key_indices = create_table + .primary_keys .iter() - .map(|k| { - column_schemas - .iter() - .enumerate() - .find_map(|(i, c)| if &c.name == k { Some(i) } else { None }) - .unwrap() // unwrap is safe because primary key's column name must have been defined + .map(|name| { + column_name_to_index_map + .get(name) + .cloned() + .context(PrimaryKeyNotFoundSnafu { msg: name }) }) - .collect::>(); + .collect::>>()?; let meta = RawTableMeta { schema: raw_schema, primary_key_indices, value_indices: vec![], - engine: create_table.engine.clone(), + engine: "mito".to_string(), next_column_id: column_schemas.len() as u32, region_numbers: vec![], engine_options: HashMap::new(), @@ -223,11 +233,37 @@ fn create_table_global_value( }) } -fn parse_partitions(create_table: &CreateTable) -> Result> { +// Remove this duplication in the future +fn create_column_schema(column_def: &api::v1::ColumnDef) -> Result { + let data_type = + ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?; + let default_constraint = match &column_def.default_constraint { + None => None, + Some(v) => Some(ColumnDefaultConstraint::try_from(&v[..]).context( + ConvertColumnDefaultConstraintSnafu { + column_name: &column_def.name, + }, + )?), + }; + ColumnSchema::new( + column_def.name.clone(), + data_type.into(), + column_def.is_nullable, + ) + .with_default_constraint(default_constraint) + .context(ConvertColumnDefaultConstraintSnafu { + column_name: &column_def.name, + }) +} + +fn parse_partitions( + create_table: &CreateExpr, + partitions: Option, +) -> Result> { // If partitions are not defined by user, use the timestamp column (which has to be existed) as // the partition column, and create only one partition. - let partition_columns = find_partition_columns(create_table)?; - let partition_entries = find_partition_entries(create_table, &partition_columns)?; + let partition_columns = find_partition_columns(create_table, &partitions)?; + let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?; partition_entries .into_iter() @@ -236,26 +272,28 @@ fn parse_partitions(create_table: &CreateTable) -> Result> { } fn find_partition_entries( - create_table: &CreateTable, + create_table: &CreateExpr, + partitions: &Option, partition_columns: &[String], ) -> Result>> { - let entries = if let Some(partitions) = &create_table.partitions { + let entries = if let Some(partitions) = partitions { let column_defs = partition_columns .iter() .map(|pc| { create_table - .columns + .column_defs .iter() - .find(|c| &c.name.value == pc) + .find(|c| &c.name == pc) // unwrap is safe here because we have checked that partition columns are defined .unwrap() }) - .collect::>(); + .collect::>(); let mut column_name_and_type = Vec::with_capacity(column_defs.len()); for column in column_defs { - let column_name = &column.name.value; - let data_type = sql_data_type_to_concrete_data_type(&column.data_type) - .context(error::ParseSqlSnafu)?; + let column_name = &column.name; + let data_type = ConcreteDataType::from( + ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?, + ); column_name_and_type.push((column_name, data_type)); } @@ -283,34 +321,38 @@ fn find_partition_entries( Ok(entries) } -fn find_partition_columns(create_table: &CreateTable) -> Result> { - let columns = if let Some(partitions) = &create_table.partitions { +fn find_partition_columns( + create_table: &CreateExpr, + partitions: &Option, +) -> Result> { + let columns = if let Some(partitions) = partitions { partitions .column_list .iter() .map(|x| x.value.clone()) - .collect::>() + .collect::>() } else { - vec![find_time_index(&create_table.constraints)?] + vec![create_table.time_index.clone()] }; Ok(columns) } #[cfg(test)] mod test { - use sql::parser::ParserContext; use sql::statements::statement::Statement; use sqlparser::dialect::GenericDialect; use super::*; + use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; - #[test] - fn test_parse_partitions() { + #[tokio::test] + async fn test_parse_partitions() { + common_telemetry::init_default_ut_logging(); let cases = [ ( r" -CREATE TABLE rcx ( a INT, b STRING, c INT ) +CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) PARTITION BY RANGE COLUMNS (b) ( PARTITION r0 VALUES LESS THAN ('hz'), PARTITION r1 VALUES LESS THAN ('sh'), @@ -321,7 +363,7 @@ ENGINE=mito", ), ( r" -CREATE TABLE rcx ( a INT, b STRING, c INT ) +CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) PARTITION BY RANGE COLUMNS (b, a) ( PARTITION r0 VALUES LESS THAN ('hz', 10), PARTITION r1 VALUES LESS THAN ('sh', 20), @@ -335,7 +377,10 @@ ENGINE=mito", let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); match &result[0] { Statement::CreateTable(c) => { - let partitions = parse_partitions(c).unwrap(); + common_telemetry::info!("{}", sql); + let factory = DefaultCreateExprFactory {}; + let expr = factory.create_expr_by_stmt(c).await.unwrap(); + let partitions = parse_partitions(&expr, c.partitions.clone()).unwrap(); let json = serde_json::to_string(&partitions).unwrap(); assert_eq!(json, expected); } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 4f13c321e0..a849b09f8d 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -3,6 +3,7 @@ mod catalog; mod datanode; pub mod error; +mod expr_factory; pub mod frontend; pub mod grpc; pub mod influxdb; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 34d2e040c9..2f32847a5d 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -465,6 +465,7 @@ mod test { use super::*; use crate::catalog::FrontendCatalogManager; + use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; use crate::instance::distributed::DistInstance; use crate::partitioning::range::RangePartitionRule; @@ -810,7 +811,12 @@ mod test { wait_datanodes_alive(kv_store).await; - let _result = dist_instance.create_table(&create_table).await.unwrap(); + let factory = DefaultCreateExprFactory {}; + let mut expr = factory.create_expr_by_stmt(&create_table).await.unwrap(); + let _result = dist_instance + .create_table(&mut expr, create_table.partitions) + .await + .unwrap(); let table_route = table_routes.get_route(&table_name).await.unwrap(); println!("{}", serde_json::to_string_pretty(&table_route).unwrap()); diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 0adf21d395..229323b259 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -242,7 +242,7 @@ pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Resu } /// Convert `ColumnDef` in sqlparser to `ColumnDef` in gRPC proto. -fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result { +pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result { let name = col.name.value.clone(); let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?; let nullable = col