From 119ff2fc2ed2a2c439920051929faa78b4a91e14 Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 6 Sep 2022 12:51:07 +0800 Subject: [PATCH] feat: create table through GRPC interface (#224) * feat: create table through GRPC interface * move `CreateExpr` `oneof` expr of `AdminExpr` in `admin.proto`, and implement the admin GRPC interface * add `table_options` and `partition_options` to `CreateExpr` * resolve code review comments Co-authored-by: luofucong --- Cargo.lock | 1 + src/api/greptime/v1/admin.proto | 40 +++- src/api/greptime/v1/column.proto | 24 +++ src/api/greptime/v1/common.proto | 18 ++ src/api/greptime/v1/database.proto | 17 +- src/catalog/src/system.rs | 1 + src/client/src/admin.rs | 48 ++++- src/client/src/database.rs | 65 +++--- src/client/src/lib.rs | 1 + src/datanode/Cargo.toml | 1 + src/datanode/src/error.rs | 6 +- src/datanode/src/instance.rs | 23 ++- src/datanode/src/server.rs | 2 +- src/datanode/src/server/grpc.rs | 1 + src/datanode/src/server/grpc/create.rs | 262 ++++++++++++++++++++++++ src/datanode/src/server/grpc/handler.rs | 59 +++++- src/datanode/src/sql/create.rs | 1 + src/datanode/src/tests.rs | 2 +- src/datanode/src/tests/grpc_test.rs | 61 +++++- src/datanode/src/tests/test_util.rs | 2 + src/servers/src/grpc.rs | 12 +- src/servers/src/grpc/handler.rs | 21 +- src/servers/src/query_handler.rs | 8 +- src/table-engine/src/engine.rs | 2 + src/table-engine/src/table/test_util.rs | 3 + src/table/src/requests.rs | 1 + 26 files changed, 605 insertions(+), 77 deletions(-) create mode 100644 src/api/greptime/v1/common.proto create mode 100644 src/datanode/src/server/grpc/create.rs diff --git a/Cargo.lock b/Cargo.lock index d0ed1ef2da..16ff64bdc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,6 +1357,7 @@ dependencies = [ "common-time", "datafusion", "datatypes", + "futures", "hyper", "log-store", "metrics 0.20.1", diff --git a/src/api/greptime/v1/admin.proto b/src/api/greptime/v1/admin.proto index 327b5d75b6..bdf87466f9 100644 --- a/src/api/greptime/v1/admin.proto +++ b/src/api/greptime/v1/admin.proto @@ -2,8 +2,40 @@ syntax = "proto3"; package greptime.v1; -// TODO(jiachun) -message AdminRequest {} +import "greptime/v1/column.proto"; +import "greptime/v1/common.proto"; -// TODO(jiachun) -message AdminResponse {} +message AdminRequest { + string name = 1; + repeated AdminExpr exprs = 2; +} + +message AdminResponse { + repeated AdminResult results = 1; +} + +message AdminExpr { + ExprHeader header = 1; + oneof expr { + CreateExpr create = 2; + } +} + +message AdminResult { + ResultHeader header = 1; + oneof result { + MutateResult mutate = 2; + } +} + +message CreateExpr { + optional string catalog_name = 1; + optional string schema_name = 2; + string table_name = 3; + optional string desc = 4; + repeated ColumnDef column_defs = 5; + string time_index = 6; + repeated string primary_keys = 7; + bool create_if_not_exists = 8; + map table_options = 9; +} diff --git a/src/api/greptime/v1/column.proto b/src/api/greptime/v1/column.proto index a66c7d2369..45dbd6bcbf 100644 --- a/src/api/greptime/v1/column.proto +++ b/src/api/greptime/v1/column.proto @@ -44,3 +44,27 @@ message Column { // If a bit in null_mask is 1, it indicates that the column value at that position is null. bytes null_mask = 4; } + +message ColumnDef { + string name = 1; + ColumnDataType data_type = 2; + bool is_nullable = 3; +} + +enum ColumnDataType { + BOOLEAN = 0; + INT8 = 1; + INT16 = 2; + INT32 = 3; + INT64 = 4; + UINT8 = 5; + UINT16 = 6; + UINT32 = 7; + UINT64 = 8; + FLOAT32 = 9; + FLOAT64 = 10; + BINARY = 11; + STRING = 12; + DATE = 13; + DATETIME = 14; +} diff --git a/src/api/greptime/v1/common.proto b/src/api/greptime/v1/common.proto new file mode 100644 index 0000000000..bda7a54a70 --- /dev/null +++ b/src/api/greptime/v1/common.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package greptime.v1; + +message ExprHeader { + uint32 version = 1; +} + +message ResultHeader { + uint32 version = 1; + uint32 code = 2; + string err_msg = 3; +} + +message MutateResult { + uint32 success = 1; + uint32 failure = 2; +} diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 6d59ef60d1..b9202d975d 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package greptime.v1; +import "greptime/v1/common.proto"; + message DatabaseRequest { string name = 1; repeated ObjectExpr exprs = 2; @@ -21,10 +23,6 @@ message ObjectExpr { } } -message ExprHeader { - uint32 version = 1; -} - // TODO(fys): Only support sql now, and will support promql etc in the future message SelectExpr { oneof expr { @@ -59,14 +57,3 @@ message ObjectResult { message SelectResult { bytes raw_data = 1; } - -message ResultHeader { - uint32 version = 1; - uint32 code = 2; - string err_msg = 3; -} - -message MutateResult { - uint32 success = 1; - uint32 failure = 2; -} diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index becb8d5690..1ad0fcd157 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -88,6 +88,7 @@ impl SystemCatalogTable { schema: schema.clone(), primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX, TIMESTAMP_INDEX], create_if_not_exists: true, + table_options: HashMap::new(), }; let table = engine diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs index 2eada14a2f..7c71d83083 100644 --- a/src/client/src/admin.rs +++ b/src/client/src/admin.rs @@ -1,14 +1,56 @@ use api::v1::*; +use snafu::prelude::*; + +use crate::database::PROTOCOL_VERSION; +use crate::error; +use crate::Client; +use crate::Result; #[derive(Clone, Debug)] pub struct Admin { + name: String, client: Client, } impl Admin { - pub fn new(client: Client) -> Self { - Self { client } + pub fn new(name: impl Into, client: Client) -> Self { + Self { + name: name.into(), + client, + } } - // TODO(jiachun): admin api + pub async fn create(&self, expr: CreateExpr) -> Result { + let header = ExprHeader { + version: PROTOCOL_VERSION, + }; + let expr = AdminExpr { + header: Some(header), + expr: Some(admin_expr::Expr::Create(expr)), + }; + // `remove(0)` is safe because of `do_request`'s invariants. + Ok(self.do_request(vec![expr]).await?.remove(0)) + } + + /// Invariants: the lengths of input vec (`Vec`) and output vec (`Vec`) are equal. + async fn do_request(&self, exprs: Vec) -> Result> { + let expr_count = exprs.len(); + let req = AdminRequest { + name: self.name.clone(), + exprs, + }; + + let resp = self.client.admin(req).await?; + + let results = resp.results; + ensure!( + results.len() == expr_count, + error::MissingResultSnafu { + name: "admin_results", + expected: expr_count, + actual: results.len(), + } + ); + Ok(results) + } } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 009ca7ca1d..647866e4ed 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use std::sync::Arc; use api::v1::codec::SelectResult as GrpcSelectResult; @@ -13,10 +12,9 @@ use common_grpc::DefaultAsPlanImpl; use datafusion::physical_plan::ExecutionPlan; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{self, MissingResultSnafu}; +use crate::error; use crate::{ - error::DatanodeSnafu, error::DecodeSelectSnafu, error::EncodePhysicalSnafu, - error::MissingHeaderSnafu, Client, Result, + error::DatanodeSnafu, error::DecodeSelectSnafu, error::EncodePhysicalSnafu, Client, Result, }; pub const PROTOCOL_VERSION: u32 = 1; @@ -97,36 +95,7 @@ impl Database { }; let obj_result = self.object(expr).await?; - - let header = obj_result.header.context(MissingHeaderSnafu)?; - - if !StatusCode::is_success(header.code) { - return DatanodeSnafu { - code: header.code, - msg: header.err_msg, - } - .fail(); - } - - let obj_result = obj_result.result.context(MissingResultSnafu { - name: "select_result".to_string(), - expected: 1_usize, - actual: 0_usize, - })?; - - let result = match obj_result { - object_result::Result::Select(select) => { - let result = select - .raw_data - .deref() - .try_into() - .context(DecodeSelectSnafu)?; - ObjectResult::Select(result) - } - object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate), - }; - - Ok(result) + obj_result.try_into() } // TODO(jiachun) update/delete @@ -165,6 +134,34 @@ pub enum ObjectResult { Mutate(GrpcMutateResult), } +impl TryFrom for ObjectResult { + type Error = error::Error; + + fn try_from(object_result: api::v1::ObjectResult) -> std::result::Result { + let header = object_result.header.context(error::MissingHeaderSnafu)?; + if !StatusCode::is_success(header.code) { + return DatanodeSnafu { + code: header.code, + msg: header.err_msg, + } + .fail(); + } + + let obj_result = object_result.result.context(error::MissingResultSnafu { + name: "result".to_string(), + expected: 1_usize, + actual: 0_usize, + })?; + Ok(match obj_result { + object_result::Result::Select(select) => { + let result = (*select.raw_data).try_into().context(DecodeSelectSnafu)?; + ObjectResult::Select(result) + } + object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate), + }) + } +} + pub enum Select { Sql(String), } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index a28abe64b7..b39ea34e20 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -1,3 +1,4 @@ +pub mod admin; mod client; mod database; mod error; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index d6b45edb97..872387c03a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -31,6 +31,7 @@ common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datatypes = { path = "../datatypes" } +futures = "0.3" hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } metrics = "0.20" diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 0ef2930969..1838885d59 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -179,6 +179,9 @@ pub enum Error { #[snafu(backtrace)] source: common_grpc::Error, }, + + #[snafu(display("Invalid ColumnDef in protobuf msg: {}", msg))] + InvalidColumnDef { msg: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -204,7 +207,8 @@ impl ErrorExt for Error { | Error::SqlTypeNotSupported { .. } | Error::CreateSchema { .. } | Error::KeyColumnNotFound { .. } - | Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments, + | Error::ConstraintNotSupported { .. } + | Error::InvalidColumnDef { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. Error::StartServer { .. } | Error::ParseAddr { .. } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 9b36ff5860..2485a30602 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,6 +1,9 @@ use std::{fs, path, sync::Arc}; -use api::v1::{object_expr, select_expr, InsertExpr, ObjectExpr, ObjectResult, SelectExpr}; +use api::v1::{ + admin_expr, object_expr, select_expr, AdminExpr, AdminResult, InsertExpr, ObjectExpr, + ObjectResult, SelectExpr, +}; use async_trait::async_trait; use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; @@ -10,7 +13,7 @@ use common_telemetry::timer; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use object_store::{backend::fs::Backend, util, ObjectStore}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; -use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; +use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler}; use snafu::prelude::*; use sql::statements::statement::Statement; use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; @@ -277,3 +280,19 @@ impl GrpcQueryHandler for Instance { Ok(object_resp) } } + +#[async_trait] +impl GrpcAdminHandler for Instance { + async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result { + let admin_resp = match expr.expr { + Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await, + other => { + return servers::error::NotSupportedSnafu { + feat: format!("{:?}", other), + } + .fail(); + } + }; + Ok(admin_resp) + } +} diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 7327c29cd1..26d0faae46 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -33,7 +33,7 @@ impl Services { ); Ok(Self { http_server: HttpServer::new(instance.clone()), - grpc_server: GrpcServer::new(instance.clone()), + grpc_server: GrpcServer::new(instance.clone(), instance.clone()), mysql_server: MysqlServer::create_server(instance, mysql_io_runtime), }) } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index bb48357e44..c6c9ffb732 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,3 +1,4 @@ +mod create; pub(crate) mod handler; pub(crate) mod insert; pub(crate) mod plan; diff --git a/src/datanode/src/server/grpc/create.rs b/src/datanode/src/server/grpc/create.rs new file mode 100644 index 0000000000..dfa936086b --- /dev/null +++ b/src/datanode/src/server/grpc/create.rs @@ -0,0 +1,262 @@ +use std::sync::Arc; + +use api::v1::{AdminResult, ColumnDataType, ColumnDef, CreateExpr}; +use common_error::prelude::{ErrorExt, StatusCode}; +use datatypes::prelude::*; +use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; +use futures::TryFutureExt; +use query::Output; +use snafu::prelude::*; +use table::requests::CreateTableRequest; + +use crate::error::{self, Result}; +use crate::instance::Instance; +use crate::server::grpc::handler::AdminResultBuilder; +use crate::sql::SqlRequest; + +impl Instance { + pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { + let request = self.create_expr_to_request(expr); + let result = futures::future::ready(request) + .and_then(|request| self.sql_handler().execute(SqlRequest::Create(request))) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. + Ok(Output::RecordBatch(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + + fn create_expr_to_request(&self, expr: CreateExpr) -> Result { + let schema = create_table_schema(&expr)?; + + let primary_key_indices = expr + .primary_keys + .iter() + .map(|key| { + schema + .column_index_by_name(key) + .context(error::KeyColumnNotFoundSnafu { name: key }) + }) + .collect::>>()?; + + let table_id = self.catalog_manager().next_table_id(); + + Ok(CreateTableRequest { + id: table_id, + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + desc: expr.desc, + schema, + primary_key_indices, + create_if_not_exists: expr.create_if_not_exists, + table_options: expr.table_options, + }) + } +} + +fn create_table_schema(expr: &CreateExpr) -> Result { + let column_schemas = expr + .column_defs + .iter() + .map(create_column_schema) + .collect::>>()?; + let ts_index = column_schemas + .iter() + .enumerate() + .find_map(|(i, column)| { + if column.name == expr.time_index { + Some(i) + } else { + None + } + }) + .context(error::KeyColumnNotFoundSnafu { + name: &expr.time_index, + })?; + Ok(Arc::new( + SchemaBuilder::from(column_schemas) + .timestamp_index(ts_index) + .build() + .context(error::CreateSchemaSnafu)?, + )) +} + +fn create_column_schema(column_def: &ColumnDef) -> Result { + let data_type = + ColumnDataType::from_i32(column_def.data_type).context(error::InvalidColumnDefSnafu { + msg: format!("unknown ColumnDataType {}", column_def.data_type), + })?; + let data_type = match data_type { + ColumnDataType::Boolean => ConcreteDataType::boolean_datatype(), + ColumnDataType::Int8 => ConcreteDataType::int8_datatype(), + ColumnDataType::Int16 => ConcreteDataType::int16_datatype(), + ColumnDataType::Int32 => ConcreteDataType::int32_datatype(), + ColumnDataType::Int64 => ConcreteDataType::int64_datatype(), + ColumnDataType::Uint8 => ConcreteDataType::uint8_datatype(), + ColumnDataType::Uint16 => ConcreteDataType::uint16_datatype(), + ColumnDataType::Uint32 => ConcreteDataType::uint32_datatype(), + ColumnDataType::Uint64 => ConcreteDataType::uint64_datatype(), + ColumnDataType::Float32 => ConcreteDataType::float32_datatype(), + ColumnDataType::Float64 => ConcreteDataType::float64_datatype(), + ColumnDataType::Binary => ConcreteDataType::binary_datatype(), + ColumnDataType::String => ConcreteDataType::string_datatype(), + ColumnDataType::Date => ConcreteDataType::date_datatype(), + ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(), + }; + Ok(ColumnSchema { + name: column_def.name.clone(), + data_type, + is_nullable: column_def.is_nullable, + }) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::tests::test_util; + + #[tokio::test] + async fn test_create_expr_to_request() { + let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts).await.unwrap(); + instance.start().await.unwrap(); + + let expr = testing_create_expr(); + let request = instance.create_expr_to_request(expr).unwrap(); + assert_eq!(request.id, 1); + assert_eq!(request.catalog_name, None); + assert_eq!(request.schema_name, None); + assert_eq!(request.table_name, "my-metrics"); + assert_eq!(request.desc, Some("blabla".to_string())); + assert_eq!(request.schema, expected_table_schema()); + assert_eq!(request.primary_key_indices, vec![1, 0]); + assert!(request.create_if_not_exists); + + let mut expr = testing_create_expr(); + expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; + let result = instance.create_expr_to_request(expr); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Specified timestamp key or primary key column not found: not-exist-column")); + } + + #[test] + fn test_create_table_schema() { + let mut expr = testing_create_expr(); + let schema = create_table_schema(&expr).unwrap(); + assert_eq!(schema, expected_table_schema()); + + expr.time_index = "not-exist-column".to_string(); + let result = create_table_schema(&expr); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Specified timestamp key or primary key column not found: not-exist-column")); + } + + #[test] + fn test_create_column_schema() { + let column_def = ColumnDef { + name: "a".to_string(), + data_type: 1024, + is_nullable: true, + }; + let result = create_column_schema(&column_def); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid ColumnDef in protobuf msg: unknown ColumnDataType 1024")); + + let column_def = ColumnDef { + name: "a".to_string(), + data_type: 12, // string + is_nullable: true, + }; + let column_schema = create_column_schema(&column_def).unwrap(); + assert_eq!(column_schema.name, "a"); + assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); + assert!(column_schema.is_nullable); + } + + fn testing_create_expr() -> CreateExpr { + let column_defs = vec![ + ColumnDef { + name: "host".to_string(), + data_type: 12, // string + is_nullable: false, + }, + ColumnDef { + name: "ts".to_string(), + data_type: 4, // int64 + is_nullable: false, + }, + ColumnDef { + name: "cpu".to_string(), + data_type: 9, // float32 + is_nullable: true, + }, + ColumnDef { + name: "memory".to_string(), + data_type: 10, // float64 + is_nullable: true, + }, + ]; + CreateExpr { + catalog_name: None, + schema_name: None, + table_name: "my-metrics".to_string(), + desc: Some("blabla".to_string()), + column_defs, + time_index: "ts".to_string(), + primary_keys: vec!["ts".to_string(), "host".to_string()], + create_if_not_exists: true, + table_options: HashMap::new(), + } + } + + fn expected_table_schema() -> SchemaRef { + let column_schemas = vec![ + ColumnSchema { + name: "host".to_string(), + data_type: ConcreteDataType::string_datatype(), + is_nullable: false, + }, + ColumnSchema { + name: "ts".to_string(), + data_type: ConcreteDataType::int64_datatype(), + is_nullable: false, + }, + ColumnSchema { + name: "cpu".to_string(), + data_type: ConcreteDataType::float32_datatype(), + is_nullable: true, + }, + ColumnSchema { + name: "memory".to_string(), + data_type: ConcreteDataType::float64_datatype(), + is_nullable: true, + }, + ]; + Arc::new( + SchemaBuilder::from(column_schemas) + .timestamp_index(1) + .build() + .unwrap(), + ) + } +} diff --git a/src/datanode/src/server/grpc/handler.rs b/src/datanode/src/server/grpc/handler.rs index b4229981a5..c90c5f8b10 100644 --- a/src/datanode/src/server/grpc/handler.rs +++ b/src/datanode/src/server/grpc/handler.rs @@ -1,6 +1,6 @@ use api::v1::{ - codec::SelectResult, object_result, MutateResult, ObjectResult, ResultHeader, - SelectResult as SelectResultRaw, + admin_result, codec::SelectResult, object_result, AdminResult, MutateResult, ObjectResult, + ResultHeader, SelectResult as SelectResultRaw, }; use common_error::prelude::ErrorExt; @@ -87,6 +87,61 @@ pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult { .build() } +#[derive(Debug)] +pub(crate) struct AdminResultBuilder { + version: u32, + code: u32, + err_msg: Option, + mutate: Option<(Success, Failure)>, +} + +impl AdminResultBuilder { + pub fn status_code(mut self, code: u32) -> Self { + self.code = code; + self + } + + pub fn err_msg(mut self, err_msg: String) -> Self { + self.err_msg = Some(err_msg); + self + } + + pub fn mutate_result(mut self, success: u32, failure: u32) -> Self { + self.mutate = Some((success, failure)); + self + } + + pub fn build(self) -> AdminResult { + let header = Some(ResultHeader { + version: self.version, + code: self.code, + err_msg: self.err_msg.unwrap_or_default(), + }); + + let result = if let Some((success, failure)) = self.mutate { + Some(admin_result::Result::Mutate(MutateResult { + success, + failure, + })) + } else { + None + }; + + AdminResult { header, result } + } +} + +impl Default for AdminResultBuilder { + fn default() -> Self { + Self { + version: PROTOCOL_VERSION, + code: 0, + err_msg: None, + mutate: None, + } + } +} + #[cfg(test)] mod tests { use api::v1::{object_result, MutateResult}; diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index f933a7cbe4..5f8924762a 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -150,6 +150,7 @@ impl SqlHandler { schema, primary_key_indices: primary_keys, create_if_not_exists: stmt.if_not_exists, + table_options: HashMap::new(), }; Ok(request) } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 4c321a7d11..3678ed339b 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -1,4 +1,4 @@ mod grpc_test; mod http_test; mod instance_test; -mod test_util; +pub(crate) mod test_util; diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 1d3d180c09..3c6a01e018 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -1,8 +1,13 @@ +use std::assert_matches::assert_matches; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use api::v1::{codec::InsertBatch, column, Column}; +use api::v1::{ + admin_result, codec::InsertBatch, column, Column, ColumnDef, CreateExpr, MutateResult, +}; +use client::admin::Admin; use client::{Client, Database, ObjectResult}; use servers::grpc::GrpcServer; use servers::server::Server; @@ -18,10 +23,8 @@ async fn test_insert_and_select() { let instance = Arc::new(Instance::new(&opts).await.unwrap()); instance.start().await.unwrap(); - test_util::create_test_table(&instance).await.unwrap(); - tokio::spawn(async move { - let mut grpc_server = GrpcServer::new(instance); + let mut grpc_server = GrpcServer::new(instance.clone(), instance); let addr = "127.0.0.1:3001".parse::().unwrap(); grpc_server.start(addr).await.unwrap() }); @@ -30,7 +33,8 @@ async fn test_insert_and_select() { tokio::time::sleep(Duration::from_secs(1)).await; let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap(); - let db = Database::new("greptime", grpc_client); + let db = Database::new("greptime", grpc_client.clone()); + let admin = Admin::new("greptime", grpc_client); // testing data: let expected_host_col = Column { @@ -71,6 +75,17 @@ async fn test_insert_and_select() { ..Default::default() }; + // create + let expr = testing_create_expr(); + let result = admin.create(expr).await.unwrap(); + assert_matches!( + result.result, + Some(admin_result::Result::Mutate(MutateResult { + success: 1, + failure: 0 + })) + ); + // insert let values = vec![InsertBatch { columns: vec![ @@ -112,3 +127,39 @@ async fn test_insert_and_select() { _ => unreachable!(), } } + +fn testing_create_expr() -> CreateExpr { + let column_defs = vec![ + ColumnDef { + name: "host".to_string(), + data_type: 12, // string + is_nullable: false, + }, + ColumnDef { + name: "cpu".to_string(), + data_type: 10, // float64 + is_nullable: true, + }, + ColumnDef { + name: "memory".to_string(), + data_type: 10, // float64 + is_nullable: true, + }, + ColumnDef { + name: "ts".to_string(), + data_type: 4, // int64 + is_nullable: true, + }, + ]; + CreateExpr { + catalog_name: None, + schema_name: None, + table_name: "demo".to_string(), + desc: Some("blabla".to_string()), + column_defs, + time_index: "ts".to_string(), + primary_keys: vec!["ts".to_string(), "host".to_string()], + create_if_not_exists: true, + table_options: HashMap::new(), + } +} diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 888e70bbb9..110bd1e4c3 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -66,6 +67,7 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> { ), create_if_not_exists: true, primary_key_indices: Vec::default(), + table_options: HashMap::new(), }, ) .await diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index b5bc22b4cd..6410cbce0e 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -12,21 +12,25 @@ use tonic::{Request, Response, Status}; use crate::error::{Result, StartGrpcSnafu, TcpBindSnafu}; use crate::grpc::handler::BatchHandler; -use crate::query_handler::GrpcQueryHandlerRef; +use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef}; use crate::server::Server; pub struct GrpcServer { query_handler: GrpcQueryHandlerRef, + admin_handler: GrpcAdminHandlerRef, } impl GrpcServer { - pub fn new(query_handler: GrpcQueryHandlerRef) -> Self { - Self { query_handler } + pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self { + Self { + query_handler, + admin_handler, + } } pub fn create_service(&self) -> greptime_server::GreptimeServer { let service = GrpcService { - handler: BatchHandler::new(self.query_handler.clone()), + handler: BatchHandler::new(self.query_handler.clone(), self.admin_handler.clone()), }; greptime_server::GreptimeServer::new(service) } diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index 82d91eee34..264e37bd87 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -1,22 +1,35 @@ -use api::v1::{BatchRequest, BatchResponse, DatabaseResponse}; +use api::v1::{AdminResponse, BatchRequest, BatchResponse, DatabaseResponse}; use crate::error::Result; -use crate::query_handler::GrpcQueryHandlerRef; +use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef}; #[derive(Clone)] pub struct BatchHandler { query_handler: GrpcQueryHandlerRef, + admin_handler: GrpcAdminHandlerRef, } impl BatchHandler { - pub fn new(query_handler: GrpcQueryHandlerRef) -> Self { - Self { query_handler } + pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self { + Self { + query_handler, + admin_handler, + } } pub async fn batch(&self, batch_req: BatchRequest) -> Result { let mut batch_resp = BatchResponse::default(); + let mut admin_resp = AdminResponse::default(); let mut db_resp = DatabaseResponse::default(); + for admin_req in batch_req.admins { + for admin_expr in admin_req.exprs { + let admin_result = self.admin_handler.exec_admin_request(admin_expr).await?; + admin_resp.results.push(admin_result); + } + } + batch_resp.admins.push(admin_resp); + for db_req in batch_req.databases { for obj_expr in db_req.exprs { let object_resp = self.query_handler.do_query(obj_expr).await?; diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index d42862dc82..e7cbb7f8c6 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use api::v1::{ObjectExpr, ObjectResult}; +use api::v1::{AdminExpr, AdminResult, ObjectExpr, ObjectResult}; use async_trait::async_trait; use query::Output; @@ -18,6 +18,7 @@ use crate::error::Result; pub type SqlQueryHandlerRef = Arc; pub type GrpcQueryHandlerRef = Arc; +pub type GrpcAdminHandlerRef = Arc; #[async_trait] pub trait SqlQueryHandler { @@ -29,3 +30,8 @@ pub trait SqlQueryHandler { pub trait GrpcQueryHandler { async fn do_query(&self, query: ObjectExpr) -> Result; } + +#[async_trait] +pub trait GrpcAdminHandler { + async fn exec_admin_request(&self, expr: AdminExpr) -> Result; +} diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index ce041f5fd7..7c5540773b 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -525,6 +525,7 @@ mod tests { create_if_not_exists: true, desc: None, primary_key_indices: Vec::default(), + table_options: HashMap::new(), }; let created_table = table_engine.create_table(&ctx, request).await.unwrap(); @@ -547,6 +548,7 @@ mod tests { create_if_not_exists: false, desc: None, primary_key_indices: Vec::default(), + table_options: HashMap::new(), }; let result = table_engine.create_table(&ctx, request).await; diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index 2bb4b50295..99b1195285 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -1,5 +1,6 @@ mod mock_engine; +use std::collections::HashMap; use std::sync::Arc; use datatypes::prelude::ConcreteDataType; @@ -95,6 +96,7 @@ pub async fn setup_test_engine_and_table() -> ( schema: schema.clone(), create_if_not_exists: true, primary_key_indices: Vec::default(), + table_options: HashMap::new(), }, ) .await @@ -126,6 +128,7 @@ pub async fn setup_mock_engine_and_table( schema: schema.clone(), create_if_not_exists: true, primary_key_indices: Vec::default(), + table_options: HashMap::new(), }, ) .await diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4e640ae3ba..b7e5a7c9c5 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -24,6 +24,7 @@ pub struct CreateTableRequest { pub schema: SchemaRef, pub primary_key_indices: Vec, pub create_if_not_exists: bool, + pub table_options: HashMap, } /// Open table request