From 66bca114013d204f77098600656bd2c0189f010f Mon Sep 17 00:00:00 2001 From: Jiachun Feng Date: Fri, 16 Dec 2022 15:47:51 +0800 Subject: [PATCH] refactor: remove optional from the protos (#756) --- benchmarks/src/bin/nyc-taxi.rs | 52 +++++++++--------- src/api/greptime/v1/admin.proto | 27 +++++----- src/api/greptime/v1/column.proto | 2 +- src/api/src/v1/column_def.rs | 11 ++-- src/client/examples/logical.rs | 18 +++---- src/client/src/admin.rs | 4 +- src/common/grpc-expr/src/alter.rs | 58 +++++++++++++------- src/common/grpc-expr/src/insert.rs | 20 +++---- src/datanode/src/instance/grpc.rs | 4 +- src/datanode/src/server/grpc.rs | 42 ++++++++------- src/frontend/src/expr_factory.rs | 36 +++++++------ src/frontend/src/instance.rs | 40 +++++++------- src/frontend/src/instance/distributed.rs | 68 +++++++++++++++--------- src/sql/src/statements.rs | 4 +- src/sql/src/statements/alter.rs | 8 +-- tests-integration/tests/grpc.rs | 30 ++++++----- 16 files changed, 235 insertions(+), 189 deletions(-) diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index f39b48c87e..0d097fb3a0 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -28,7 +28,7 @@ use arrow::record_batch::RecordBatch; use clap::Parser; use client::admin::Admin; use client::api::v1::column::Values; -use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr}; +use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId}; use client::{Client, Database, Select}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -219,126 +219,126 @@ fn build_values(column: &ArrayRef) -> Values { } } -fn create_table_expr() -> CreateExpr { - CreateExpr { - catalog_name: Some(CATALOG_NAME.to_string()), - schema_name: Some(SCHEMA_NAME.to_string()), +fn create_table_expr() -> CreateTableExpr { + CreateTableExpr { + catalog_name: CATALOG_NAME.to_string(), + schema_name: SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), - desc: None, + desc: "".to_string(), column_defs: vec![ ColumnDef { name: "VendorID".to_string(), datatype: ColumnDataType::Int64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "tpep_pickup_datetime".to_string(), datatype: ColumnDataType::Int64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "tpep_dropoff_datetime".to_string(), datatype: ColumnDataType::Int64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "passenger_count".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "trip_distance".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "RatecodeID".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "store_and_fwd_flag".to_string(), datatype: ColumnDataType::String as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "PULocationID".to_string(), datatype: ColumnDataType::Int64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "DOLocationID".to_string(), datatype: ColumnDataType::Int64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "payment_type".to_string(), datatype: ColumnDataType::Int64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "fare_amount".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "extra".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "mta_tax".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "tip_amount".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "tolls_amount".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "improvement_surcharge".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "total_amount".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "congestion_surcharge".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "airport_fee".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ], time_index: "tpep_pickup_datetime".to_string(), @@ -346,7 +346,7 @@ fn create_table_expr() -> CreateExpr { create_if_not_exists: false, table_options: Default::default(), region_ids: vec![0], - table_id: Some(0), + table_id: Some(TableId { id: 0 }), } } diff --git a/src/api/greptime/v1/admin.proto b/src/api/greptime/v1/admin.proto index 3f253cde0f..d5c9e93a7c 100644 --- a/src/api/greptime/v1/admin.proto +++ b/src/api/greptime/v1/admin.proto @@ -17,7 +17,7 @@ message AdminResponse { message AdminExpr { ExprHeader header = 1; oneof expr { - CreateExpr create = 2; + CreateTableExpr create_table = 2; AlterExpr alter = 3; CreateDatabaseExpr create_database = 4; DropTableExpr drop_table = 5; @@ -31,24 +31,23 @@ message AdminResult { } } -// TODO(hl): rename to CreateTableExpr -message CreateExpr { - optional string catalog_name = 1; - optional string schema_name = 2; +message CreateTableExpr { + string catalog_name = 1; + string schema_name = 2; string table_name = 3; - optional string desc = 4; + string desc = 4; repeated ColumnDef column_defs = 5; string time_index = 6; repeated string primary_keys = 7; bool create_if_not_exists = 8; map table_options = 9; - optional uint32 table_id = 10; + TableId table_id = 10; repeated uint32 region_ids = 11; } message AlterExpr { - optional string catalog_name = 1; - optional string schema_name = 2; + string catalog_name = 1; + string schema_name = 2; string table_name = 3; oneof kind { AddColumns add_columns = 4; @@ -62,6 +61,11 @@ message DropTableExpr { string table_name = 3; } +message CreateDatabaseExpr { + //TODO(hl): maybe rename to schema_name? + string database_name = 1; +} + message AddColumns { repeated AddColumn add_columns = 1; } @@ -79,7 +83,6 @@ message DropColumn { string name = 1; } -message CreateDatabaseExpr { - //TODO(hl): maybe rename to schema_name? - string database_name = 1; +message TableId { + uint32 id = 1; } diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto index 6f5692747e..c1bda12142 100644 --- a/src/api/greptime/v1/column.proto +++ b/src/api/greptime/v1/column.proto @@ -59,7 +59,7 @@ message ColumnDef { string name = 1; ColumnDataType datatype = 2; bool is_nullable = 3; - optional bytes default_constraint = 4; + bytes default_constraint = 4; } enum ColumnDataType { diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 131ad75764..828adfccf8 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -23,12 +23,13 @@ impl ColumnDef { pub fn try_as_column_schema(&self) -> Result { let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?; - let constraint = match &self.default_constraint { - None => None, - Some(v) => Some( - ColumnDefaultConstraint::try_from(&v[..]) + let constraint = if self.default_constraint.is_empty() { + None + } else { + Some( + ColumnDefaultConstraint::try_from(self.default_constraint.as_slice()) .context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?, - ), + ) }; ColumnSchema::new(&self.name, data_type.into(), self.is_nullable) diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 9ea6cdc42f..3fe3ff0a7b 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{ColumnDataType, ColumnDef, CreateExpr}; +use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId}; use client::admin::Admin; use client::{Client, Database}; use prost_09::Message; @@ -33,36 +33,36 @@ fn main() { async fn run() { let client = Client::with_urls(vec!["127.0.0.1:3001"]); - let create_table_expr = CreateExpr { - catalog_name: Some("greptime".to_string()), - schema_name: Some("public".to_string()), + let create_table_expr = CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: "test_logical_dist_exec".to_string(), - desc: None, + desc: "".to_string(), column_defs: vec![ ColumnDef { name: "timestamp".to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "key".to_string(), datatype: ColumnDataType::Uint64 as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "value".to_string(), datatype: ColumnDataType::Uint64 as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, ], time_index: "timestamp".to_string(), primary_keys: vec!["key".to_string()], create_if_not_exists: false, table_options: Default::default(), - table_id: Some(1024), + table_id: Some(TableId { id: 1024 }), region_ids: vec![0], }; diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs index f70aea0356..2a22f7a26e 100644 --- a/src/client/src/admin.rs +++ b/src/client/src/admin.rs @@ -34,13 +34,13 @@ impl Admin { } } - pub async fn create(&self, expr: CreateExpr) -> Result { + pub async fn create(&self, expr: CreateTableExpr) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, }; let expr = AdminExpr { header: Some(header), - expr: Some(admin_expr::Expr::Create(expr)), + expr: Some(admin_expr::Expr::CreateTable(expr)), }; self.do_request(expr).await } diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index cdef37cbcb..8f43932c35 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::v1::alter_expr::Kind; -use api::v1::{AlterExpr, CreateExpr, DropColumns}; +use api::v1::{AlterExpr, CreateTableExpr, DropColumns}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use snafu::{ensure, OptionExt, ResultExt}; @@ -29,6 +29,16 @@ use crate::error::{ /// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`] pub fn alter_expr_to_request(expr: AlterExpr) -> Result> { + let catalog_name = if expr.catalog_name.is_empty() { + None + } else { + Some(expr.catalog_name) + }; + let schema_name = if expr.schema_name.is_empty() { + None + } else { + Some(expr.schema_name) + }; match expr.kind { Some(Kind::AddColumns(add_columns)) => { let add_column_requests = add_columns @@ -57,8 +67,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result Result Result Result { +pub fn create_table_schema(expr: &CreateTableExpr) -> Result { let column_schemas = expr .column_defs .iter() @@ -119,7 +129,10 @@ pub fn create_table_schema(expr: &CreateExpr) -> Result { )) } -pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result { +pub fn create_expr_to_request( + table_id: TableId, + expr: CreateTableExpr, +) -> Result { let schema = create_table_schema(&expr)?; let primary_key_indices = expr .primary_keys @@ -134,12 +147,19 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result>>()?; - let catalog_name = expr - .catalog_name - .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()); - let schema_name = expr - .schema_name - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + let mut catalog_name = expr.catalog_name; + if catalog_name.is_empty() { + catalog_name = DEFAULT_CATALOG_NAME.to_string(); + } + let mut schema_name = expr.schema_name; + if schema_name.is_empty() { + schema_name = DEFAULT_SCHEMA_NAME.to_string(); + } + let desc = if expr.desc.is_empty() { + None + } else { + Some(expr.desc) + }; let region_ids = if expr.region_ids.is_empty() { vec![0] @@ -152,7 +172,7 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result ColumnD name: column_name.to_string(), datatype, is_nullable: nullable, - default_constraint: None, + default_constraint: vec![], } } @@ -214,7 +214,7 @@ pub fn build_create_expr_from_insertion( table_id: Option, table_name: &str, columns: &[Column], -) -> Result { +) -> Result { let mut new_columns: HashSet = HashSet::default(); let mut column_defs = Vec::default(); let mut primary_key_indices = Vec::default(); @@ -263,17 +263,17 @@ pub fn build_create_expr_from_insertion( .map(|idx| columns[*idx].column_name.clone()) .collect::>(); - let expr = CreateExpr { - catalog_name: Some(catalog_name.to_string()), - schema_name: Some(schema_name.to_string()), + let expr = CreateTableExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), table_name: table_name.to_string(), - desc: Some("Created on insertion".to_string()), + desc: "Created on insertion".to_string(), column_defs, time_index: timestamp_field_name, primary_keys, create_if_not_exists: true, table_options: Default::default(), - table_id, + table_id: table_id.map(|id| api::v1::TableId { id }), region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend }; @@ -516,9 +516,9 @@ mod tests { build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0) .unwrap(); - assert_eq!(table_id, create_expr.table_id); + assert_eq!(table_id, create_expr.table_id.map(|x| x.id)); assert_eq!(table_name, create_expr.table_name); - assert_eq!(Some("Created on insertion".to_string()), create_expr.desc); + assert_eq!("Created on insertion".to_string(), create_expr.desc); assert_eq!( vec![create_expr.column_defs[0].name.clone()], create_expr.primary_keys diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 3f6d531633..0817abcb96 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -188,7 +188,9 @@ impl GrpcQueryHandler for Instance { impl GrpcAdminHandler for Instance { async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result { let admin_resp = match expr.expr { - Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await, + Some(admin_expr::Expr::CreateTable(create_expr)) => { + self.handle_create(create_expr).await + } Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await, Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => { self.execute_create_database(create_database_expr).await diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 5109522541..3fa54f3b39 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::result::AdminResultBuilder; -use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr}; +use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr}; use common_error::prelude::{ErrorExt, StatusCode}; use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; use common_query::Output; @@ -31,15 +31,15 @@ use crate::sql::SqlRequest; impl Instance { /// Handle gRPC create table requests. - pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { + pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult { // Respect CreateExpr's table id and region ids if present, or allocate table id // from local table id provider and set region id to 0. - let table_id = if let Some(table_id) = expr.table_id { + let table_id = if let Some(table_id) = &expr.table_id { info!( "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", - expr.catalog_name, expr.schema_name, expr.table_name, table_id + expr.catalog_name, expr.schema_name, expr.table_name, table_id.id ); - table_id + table_id.id } else { match self.table_id_provider.as_ref() { None => { @@ -157,7 +157,7 @@ impl Instance { mod tests { use std::sync::Arc; - use api::v1::{ColumnDataType, ColumnDef}; + use api::v1::{ColumnDataType, ColumnDef, TableId}; use common_catalog::consts::MIN_USER_TABLE_ID; use common_grpc_expr::create_table_schema; use datatypes::prelude::ConcreteDataType; @@ -175,7 +175,7 @@ mod tests { assert_eq!(request.catalog_name, "greptime".to_string()); assert_eq!(request.schema_name, "public".to_string()); assert_eq!(request.table_name, "my-metrics"); - assert_eq!(request.desc, Some("blabla".to_string())); + assert_eq!(request.desc, Some("blabla little magic fairy".to_string())); assert_eq!(request.schema, expected_table_schema()); assert_eq!(request.primary_key_indices, vec![1, 0]); assert!(request.create_if_not_exists); @@ -214,7 +214,7 @@ mod tests { name: "a".to_string(), datatype: 1024, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }; let result = column_def.try_as_column_schema(); assert!(matches!( @@ -226,7 +226,7 @@ mod tests { name: "a".to_string(), datatype: ColumnDataType::String as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }; let column_schema = column_def.try_as_column_schema().unwrap(); assert_eq!(column_schema.name, "a"); @@ -238,7 +238,7 @@ mod tests { name: "a".to_string(), datatype: ColumnDataType::String as i32, is_nullable: true, - default_constraint: Some(default_constraint.clone().try_into().unwrap()), + default_constraint: default_constraint.clone().try_into().unwrap(), }; let column_schema = column_def.try_as_column_schema().unwrap(); assert_eq!(column_schema.name, "a"); @@ -250,44 +250,46 @@ mod tests { ); } - fn testing_create_expr() -> CreateExpr { + fn testing_create_expr() -> CreateTableExpr { let column_defs = vec![ ColumnDef { name: "host".to_string(), datatype: ColumnDataType::String as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "ts".to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "cpu".to_string(), datatype: ColumnDataType::Float32 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "memory".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ]; - CreateExpr { - catalog_name: None, - schema_name: None, + CreateTableExpr { + catalog_name: "".to_string(), + schema_name: "".to_string(), table_name: "my-metrics".to_string(), - desc: Some("blabla".to_string()), + desc: "blabla little magic fairy".to_string(), column_defs, time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, table_options: Default::default(), - table_id: Some(MIN_USER_TABLE_ID), + table_id: Some(TableId { + id: MIN_USER_TABLE_ID, + }), region_ids: vec![0], } } diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 204eb42d92..dff5a768f3 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{Column, ColumnDataType, CreateExpr}; +use api::v1::{Column, ColumnDataType, CreateTableExpr}; use datatypes::schema::ColumnSchema; use snafu::{ensure, ResultExt}; use sql::ast::{ColumnDef, TableConstraint}; @@ -32,7 +32,7 @@ pub type CreateExprFactoryRef = Arc; #[async_trait::async_trait] pub trait CreateExprFactory { - async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result; + async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result; async fn create_expr_by_columns( &self, @@ -40,7 +40,7 @@ pub trait CreateExprFactory { schema_name: &str, table_name: &str, columns: &[Column], - ) -> crate::error::Result; + ) -> crate::error::Result; } #[derive(Debug)] @@ -48,7 +48,7 @@ pub struct DefaultCreateExprFactory; #[async_trait::async_trait] impl CreateExprFactory for DefaultCreateExprFactory { - async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result { + async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result { create_to_expr(None, vec![0], stmt) } @@ -58,7 +58,7 @@ impl CreateExprFactory for DefaultCreateExprFactory { schema_name: &str, table_name: &str, columns: &[Column], - ) -> Result { + ) -> Result { let table_id = None; let create_expr = common_grpc_expr::build_create_expr_from_insertion( catalog_name, @@ -78,23 +78,23 @@ fn create_to_expr( table_id: Option, region_ids: Vec, create: &CreateTable, -) -> Result { +) -> Result { let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?; let time_index = find_time_index(&create.constraints)?; - let expr = CreateExpr { - catalog_name: Some(catalog_name), - schema_name: Some(schema_name), + let expr = CreateTableExpr { + catalog_name, + schema_name, table_name, - desc: None, + desc: "".to_string(), column_defs: columns_to_expr(&create.columns, &time_index)?, time_index, primary_keys: find_primary_keys(&create.constraints)?, create_if_not_exists: create.if_not_exists, // TODO(LFC): Fill in other table options. table_options: HashMap::from([("engine".to_string(), create.engine.clone())]), - table_id, + table_id: table_id.map(|id| api::v1::TableId { id }), region_ids, }; Ok(expr) @@ -171,12 +171,14 @@ fn columns_to_expr( datatype: datatype as i32, is_nullable: schema.is_nullable(), default_constraint: match schema.default_constraint() { - None => None, - Some(v) => Some(v.clone().try_into().context( - ConvertColumnDefaultConstraintSnafu { - column_name: &schema.name, - }, - )?), + None => vec![], + Some(v) => { + v.clone() + .try_into() + .context(ConvertColumnDefaultConstraintSnafu { + column_name: &schema.name, + })? + } }, }) }) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 730c16d3b4..849b43c984 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -25,7 +25,7 @@ use api::v1::alter_expr::Kind; use api::v1::object_expr::Expr; use api::v1::{ admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr, - CreateExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr, + CreateTableExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; @@ -196,7 +196,7 @@ impl Instance { /// Handle create expr. pub async fn handle_create_table( &self, - mut expr: CreateExpr, + mut expr: CreateTableExpr, partitions: Option, ) -> Result { if let Some(v) = &self.dist_instance { @@ -206,7 +206,7 @@ impl Instance { header: Some(ExprHeader { version: PROTOCOL_VERSION, }), - expr: Some(admin_expr::Expr::Create(expr)), + expr: Some(admin_expr::Expr::CreateTable(expr)), }; let result = self .grpc_admin_handler @@ -359,8 +359,8 @@ impl Instance { ); let expr = AlterExpr { table_name: table_name.to_string(), - schema_name: Some(schema_name.to_string()), - catalog_name: Some(catalog_name.to_string()), + schema_name: schema_name.to_string(), + catalog_name: catalog_name.to_string(), kind: Some(Kind::AddColumns(add_columns)), }; @@ -630,7 +630,7 @@ impl GrpcAdminHandler for Instance { async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result { // Force the default to be `None` rather than `Some(0)` comes from gRPC decode. // Related issue: #480 - if let Some(api::v1::admin_expr::Expr::Create(create)) = &mut expr.expr { + if let Some(api::v1::admin_expr::Expr::CreateTable(create)) = &mut expr.expr { create.table_id = None; } self.grpc_admin_handler.exec_admin_request(expr).await @@ -808,7 +808,7 @@ mod tests { let create_expr = create_expr(); let admin_expr = AdminExpr { header: Some(ExprHeader::default()), - expr: Some(admin_expr::Expr::Create(create_expr)), + expr: Some(admin_expr::Expr::CreateTable(create_expr)), }; let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr) .await @@ -886,48 +886,46 @@ mod tests { } } - fn create_expr() -> CreateExpr { + fn create_expr() -> CreateTableExpr { let column_defs = vec![ GrpcColumnDef { name: "host".to_string(), datatype: ColumnDataType::String as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, GrpcColumnDef { name: "cpu".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, GrpcColumnDef { name: "memory".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, GrpcColumnDef { name: "disk_util".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: Some( - ColumnDefaultConstraint::Value(Value::from(9.9f64)) - .try_into() - .unwrap(), - ), + default_constraint: ColumnDefaultConstraint::Value(Value::from(9.9f64)) + .try_into() + .unwrap(), }, GrpcColumnDef { name: "ts".to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ]; - CreateExpr { - catalog_name: None, - schema_name: None, + CreateTableExpr { + catalog_name: "".to_string(), + schema_name: "".to_string(), table_name: "demo".to_string(), - desc: None, + desc: "".to_string(), column_defs, time_index: "ts".to_string(), primary_keys: vec!["host".to_string()], diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index a44e4596fa..5d46823f01 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::result::AdminResultBuilder; use api::v1::{ - admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, ObjectExpr, - ObjectResult, + admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr, + ObjectResult, TableId, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; @@ -86,7 +86,7 @@ impl DistInstance { pub(crate) async fn create_table( &self, - create_table: &mut CreateExpr, + create_table: &mut CreateTableExpr, partitions: Option, ) -> Result { let response = self.create_table_in_meta(create_table, partitions).await?; @@ -112,7 +112,9 @@ impl DistInstance { table_name: create_table.table_name.to_string() } ); - create_table.table_id = Some(table_route.table.id as u32); + create_table.table_id = Some(TableId { + id: table_route.table.id as u32, + }); self.put_table_global_meta(create_table, table_route) .await?; @@ -194,8 +196,16 @@ impl DistInstance { } async fn handle_alter_table(&self, expr: AlterExpr) -> Result { - let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); - let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); + let catalog_name = if expr.catalog_name.is_empty() { + DEFAULT_CATALOG_NAME + } else { + expr.catalog_name.as_str() + }; + let schema_name = if expr.schema_name.is_empty() { + DEFAULT_SCHEMA_NAME + } else { + expr.schema_name.as_str() + }; let table_name = expr.table_name.as_str(); let table = self .catalog_manager @@ -223,20 +233,18 @@ impl DistInstance { async fn create_table_in_meta( &self, - create_table: &CreateExpr, + create_table: &CreateTableExpr, partitions: Option, ) -> Result { - let table_name = TableName::new( - create_table - .catalog_name - .clone() - .unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()), - create_table - .schema_name - .clone() - .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()), - create_table.table_name.clone(), - ); + let mut catalog_name = create_table.catalog_name.clone(); + if catalog_name.is_empty() { + catalog_name = DEFAULT_CATALOG_NAME.to_string(); + } + let mut schema_name = create_table.schema_name.clone(); + if schema_name.is_empty() { + schema_name = DEFAULT_SCHEMA_NAME.to_string(); + } + let table_name = TableName::new(catalog_name, schema_name, create_table.table_name.clone()); let partitions = parse_partitions(create_table, partitions)?; let request = MetaCreateRequest { @@ -252,7 +260,7 @@ impl DistInstance { // TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method? async fn put_table_global_meta( &self, - create_table: &CreateExpr, + create_table: &CreateTableExpr, table_route: &TableRoute, ) -> Result<()> { let table_name = &table_route.table.table_name; @@ -274,10 +282,12 @@ impl DistInstance { .await .context(CatalogSnafu)? { - let existing_bytes = existing.unwrap(); //this unwrap is safe since we compare with empty bytes and failed + let existing_bytes = existing.unwrap(); // this unwrap is safe since we compare with empty bytes and failed let existing_value = TableGlobalValue::from_bytes(&existing_bytes).context(CatalogEntrySerdeSnafu)?; - if existing_value.table_info.ident.table_id != create_table.table_id.unwrap() { + if existing_value.table_info.ident.table_id + != create_table.table_id.as_ref().unwrap().id + { error!( "Table with name {} already exists, value in catalog: {:?}", key, existing_bytes @@ -340,7 +350,7 @@ impl GrpcAdminHandler for DistInstance { } fn create_table_global_value( - create_table: &CreateExpr, + create_table: &CreateTableExpr, table_route: &TableRoute, ) -> Result { let table_name = &table_route.table.table_name; @@ -419,13 +429,19 @@ fn create_table_global_value( created_on: DateTime::default(), }; + let desc = if create_table.desc.is_empty() { + None + } else { + Some(create_table.desc.clone()) + }; + let table_info = RawTableInfo { ident: TableIdent { table_id: table_route.table.id as u32, version: 0, }, name: table_name.table_name.clone(), - desc: create_table.desc.clone(), + desc, catalog_name: table_name.catalog_name.clone(), schema_name: table_name.schema_name.clone(), meta, @@ -440,7 +456,7 @@ fn create_table_global_value( } fn parse_partitions( - create_table: &CreateExpr, + create_table: &CreateTableExpr, partitions: Option, ) -> Result> { // If partitions are not defined by user, use the timestamp column (which has to be existed) as @@ -455,7 +471,7 @@ fn parse_partitions( } fn find_partition_entries( - create_table: &CreateExpr, + create_table: &CreateTableExpr, partitions: &Option, partition_columns: &[String], ) -> Result>> { @@ -505,7 +521,7 @@ fn find_partition_entries( } fn find_partition_columns( - create_table: &CreateExpr, + create_table: &CreateTableExpr, partitions: &Option, ) -> Result> { let columns = if let Some(partitions) = partitions { diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 6dcc5fd220..c9a4d58484 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -283,7 +283,7 @@ pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result for AlterExpr { type Error = crate::error::Error; fn try_from(value: AlterTable) -> Result { - let (catalog, schema, table) = table_idents_to_full_name(&value.table_name)?; + let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&value.table_name)?; let kind = match value.alter_operation { AlterTableOperation::AddConstraint(_) => { @@ -80,9 +80,9 @@ impl TryFrom for AlterExpr { } }; let expr = AlterExpr { - catalog_name: Some(catalog), - schema_name: Some(schema), - table_name: table, + catalog_name, + schema_name, + table_name, kind: Some(kind), }; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7ebce04509..e38b648eda 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -15,7 +15,7 @@ use api::v1::alter_expr::Kind; use api::v1::column::SemanticType; use api::v1::{ admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateExpr, InsertExpr, MutateResult, + CreateTableExpr, InsertExpr, MutateResult, TableId, }; use client::admin::Admin; use client::{Client, Database, ObjectResult}; @@ -151,7 +151,7 @@ pub async fn test_insert_and_select(store_type: StorageType) { name: "test_column".to_string(), datatype: ColumnDataType::Int64.into(), is_nullable: true, - default_constraint: None, + default_constraint: vec![], }; let kind = Kind::AddColumns(AddColumns { add_columns: vec![AddColumn { @@ -161,8 +161,8 @@ pub async fn test_insert_and_select(store_type: StorageType) { }); let expr = AlterExpr { table_name: "test_table".to_string(), - catalog_name: None, - schema_name: None, + catalog_name: "".to_string(), + schema_name: "".to_string(), kind: Some(kind), }; let result = admin.alter(expr).await.unwrap(); @@ -222,44 +222,46 @@ async fn insert_and_assert(db: &Database) { } } -fn testing_create_expr() -> CreateExpr { +fn testing_create_expr() -> CreateTableExpr { let column_defs = vec![ ColumnDef { name: "host".to_string(), datatype: ColumnDataType::String as i32, is_nullable: false, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "cpu".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "memory".to_string(), datatype: ColumnDataType::Float64 as i32, is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ColumnDef { name: "ts".to_string(), datatype: ColumnDataType::TimestampMillisecond as i32, // timestamp is_nullable: true, - default_constraint: None, + default_constraint: vec![], }, ]; - CreateExpr { - catalog_name: None, - schema_name: None, + CreateTableExpr { + catalog_name: "".to_string(), + schema_name: "".to_string(), table_name: "demo".to_string(), - desc: Some("blabla".to_string()), + desc: "blabla little magic fairy".to_string(), column_defs, time_index: "ts".to_string(), primary_keys: vec!["host".to_string()], create_if_not_exists: true, table_options: Default::default(), - table_id: Some(MIN_USER_TABLE_ID), + table_id: Some(TableId { + id: MIN_USER_TABLE_ID, + }), region_ids: vec![0], } }