feat: create table through GRPC interface (#224)

* feat: create table through GRPC interface

* move `CreateExpr` `oneof` expr of `AdminExpr` in `admin.proto`, and implement the admin GRPC interface

* add `table_options` and `partition_options` to `CreateExpr`

* resolve code review comments

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-09-06 12:51:07 +08:00
committed by GitHub
parent 3f9144a2e3
commit 119ff2fc2e
26 changed files with 605 additions and 77 deletions

View File

@@ -31,6 +31,7 @@ common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }
metrics = "0.20"

View File

@@ -179,6 +179,9 @@ pub enum Error {
#[snafu(backtrace)]
source: common_grpc::Error,
},
#[snafu(display("Invalid ColumnDef in protobuf msg: {}", msg))]
InvalidColumnDef { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -204,7 +207,8 @@ impl ErrorExt for Error {
| Error::SqlTypeNotSupported { .. }
| Error::CreateSchema { .. }
| Error::KeyColumnNotFound { .. }
| Error::ConstraintNotSupported { .. } => StatusCode::InvalidArguments,
| Error::ConstraintNotSupported { .. }
| Error::InvalidColumnDef { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartServer { .. }
| Error::ParseAddr { .. }

View File

@@ -1,6 +1,9 @@
use std::{fs, path, sync::Arc};
use api::v1::{object_expr, select_expr, InsertExpr, ObjectExpr, ObjectResult, SelectExpr};
use api::v1::{
admin_expr, object_expr, select_expr, AdminExpr, AdminResult, InsertExpr, ObjectExpr,
ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
@@ -10,7 +13,7 @@ use common_telemetry::timer;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use object_store::{backend::fs::Backend, util, ObjectStore};
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler};
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use snafu::prelude::*;
use sql::statements::statement::Statement;
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
@@ -277,3 +280,19 @@ impl GrpcQueryHandler for Instance {
Ok(object_resp)
}
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
}
.fail();
}
};
Ok(admin_resp)
}
}

View File

@@ -33,7 +33,7 @@ impl Services {
);
Ok(Self {
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone()),
mysql_server: MysqlServer::create_server(instance, mysql_io_runtime),
})
}

View File

@@ -1,3 +1,4 @@
mod create;
pub(crate) mod handler;
pub(crate) mod insert;
pub(crate) mod plan;

View File

@@ -0,0 +1,262 @@
use std::sync::Arc;
use api::v1::{AdminResult, ColumnDataType, ColumnDef, CreateExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use query::Output;
use snafu::prelude::*;
use table::requests::CreateTableRequest;
use crate::error::{self, Result};
use crate::instance::Instance;
use crate::server::grpc::handler::AdminResultBuilder;
use crate::sql::SqlRequest;
impl Instance {
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
let request = self.create_expr_to_request(expr);
let result = futures::future::ready(request)
.and_then(|request| self.sql_handler().execute(SqlRequest::Create(request)))
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
// Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug.
Ok(Output::RecordBatch(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
fn create_expr_to_request(&self, expr: CreateExpr) -> Result<CreateTableRequest> {
let schema = create_table_schema(&expr)?;
let primary_key_indices = expr
.primary_keys
.iter()
.map(|key| {
schema
.column_index_by_name(key)
.context(error::KeyColumnNotFoundSnafu { name: key })
})
.collect::<Result<Vec<usize>>>()?;
let table_id = self.catalog_manager().next_table_id();
Ok(CreateTableRequest {
id: table_id,
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
desc: expr.desc,
schema,
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
})
}
}
fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
.map(create_column_schema)
.collect::<Result<Vec<ColumnSchema>>>()?;
let ts_index = column_schemas
.iter()
.enumerate()
.find_map(|(i, column)| {
if column.name == expr.time_index {
Some(i)
} else {
None
}
})
.context(error::KeyColumnNotFoundSnafu {
name: &expr.time_index,
})?;
Ok(Arc::new(
SchemaBuilder::from(column_schemas)
.timestamp_index(ts_index)
.build()
.context(error::CreateSchemaSnafu)?,
))
}
fn create_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let data_type =
ColumnDataType::from_i32(column_def.data_type).context(error::InvalidColumnDefSnafu {
msg: format!("unknown ColumnDataType {}", column_def.data_type),
})?;
let data_type = match data_type {
ColumnDataType::Boolean => ConcreteDataType::boolean_datatype(),
ColumnDataType::Int8 => ConcreteDataType::int8_datatype(),
ColumnDataType::Int16 => ConcreteDataType::int16_datatype(),
ColumnDataType::Int32 => ConcreteDataType::int32_datatype(),
ColumnDataType::Int64 => ConcreteDataType::int64_datatype(),
ColumnDataType::Uint8 => ConcreteDataType::uint8_datatype(),
ColumnDataType::Uint16 => ConcreteDataType::uint16_datatype(),
ColumnDataType::Uint32 => ConcreteDataType::uint32_datatype(),
ColumnDataType::Uint64 => ConcreteDataType::uint64_datatype(),
ColumnDataType::Float32 => ConcreteDataType::float32_datatype(),
ColumnDataType::Float64 => ConcreteDataType::float64_datatype(),
ColumnDataType::Binary => ConcreteDataType::binary_datatype(),
ColumnDataType::String => ConcreteDataType::string_datatype(),
ColumnDataType::Date => ConcreteDataType::date_datatype(),
ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(),
};
Ok(ColumnSchema {
name: column_def.name.clone(),
data_type,
is_nullable: column_def.is_nullable,
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use crate::tests::test_util;
#[tokio::test]
async fn test_create_expr_to_request() {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts).await.unwrap();
instance.start().await.unwrap();
let expr = testing_create_expr();
let request = instance.create_expr_to_request(expr).unwrap();
assert_eq!(request.id, 1);
assert_eq!(request.catalog_name, None);
assert_eq!(request.schema_name, None);
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = instance.create_expr_to_request(expr);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Specified timestamp key or primary key column not found: not-exist-column"));
}
#[test]
fn test_create_table_schema() {
let mut expr = testing_create_expr();
let schema = create_table_schema(&expr).unwrap();
assert_eq!(schema, expected_table_schema());
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Specified timestamp key or primary key column not found: not-exist-column"));
}
#[test]
fn test_create_column_schema() {
let column_def = ColumnDef {
name: "a".to_string(),
data_type: 1024,
is_nullable: true,
};
let result = create_column_schema(&column_def);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Invalid ColumnDef in protobuf msg: unknown ColumnDataType 1024"));
let column_def = ColumnDef {
name: "a".to_string(),
data_type: 12, // string
is_nullable: true,
};
let column_schema = create_column_schema(&column_def).unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable);
}
fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
data_type: 12, // string
is_nullable: false,
},
ColumnDef {
name: "ts".to_string(),
data_type: 4, // int64
is_nullable: false,
},
ColumnDef {
name: "cpu".to_string(),
data_type: 9, // float32
is_nullable: true,
},
ColumnDef {
name: "memory".to_string(),
data_type: 10, // float64
is_nullable: true,
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "my-metrics".to_string(),
desc: Some("blabla".to_string()),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: HashMap::new(),
}
}
fn expected_table_schema() -> SchemaRef {
let column_schemas = vec![
ColumnSchema {
name: "host".to_string(),
data_type: ConcreteDataType::string_datatype(),
is_nullable: false,
},
ColumnSchema {
name: "ts".to_string(),
data_type: ConcreteDataType::int64_datatype(),
is_nullable: false,
},
ColumnSchema {
name: "cpu".to_string(),
data_type: ConcreteDataType::float32_datatype(),
is_nullable: true,
},
ColumnSchema {
name: "memory".to_string(),
data_type: ConcreteDataType::float64_datatype(),
is_nullable: true,
},
];
Arc::new(
SchemaBuilder::from(column_schemas)
.timestamp_index(1)
.build()
.unwrap(),
)
}
}

View File

@@ -1,6 +1,6 @@
use api::v1::{
codec::SelectResult, object_result, MutateResult, ObjectResult, ResultHeader,
SelectResult as SelectResultRaw,
admin_result, codec::SelectResult, object_result, AdminResult, MutateResult, ObjectResult,
ResultHeader, SelectResult as SelectResultRaw,
};
use common_error::prelude::ErrorExt;
@@ -87,6 +87,61 @@ pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult {
.build()
}
#[derive(Debug)]
pub(crate) struct AdminResultBuilder {
version: u32,
code: u32,
err_msg: Option<String>,
mutate: Option<(Success, Failure)>,
}
impl AdminResultBuilder {
pub fn status_code(mut self, code: u32) -> Self {
self.code = code;
self
}
pub fn err_msg(mut self, err_msg: String) -> Self {
self.err_msg = Some(err_msg);
self
}
pub fn mutate_result(mut self, success: u32, failure: u32) -> Self {
self.mutate = Some((success, failure));
self
}
pub fn build(self) -> AdminResult {
let header = Some(ResultHeader {
version: self.version,
code: self.code,
err_msg: self.err_msg.unwrap_or_default(),
});
let result = if let Some((success, failure)) = self.mutate {
Some(admin_result::Result::Mutate(MutateResult {
success,
failure,
}))
} else {
None
};
AdminResult { header, result }
}
}
impl Default for AdminResultBuilder {
fn default() -> Self {
Self {
version: PROTOCOL_VERSION,
code: 0,
err_msg: None,
mutate: None,
}
}
}
#[cfg(test)]
mod tests {
use api::v1::{object_result, MutateResult};

View File

@@ -150,6 +150,7 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
schema,
primary_key_indices: primary_keys,
create_if_not_exists: stmt.if_not_exists,
table_options: HashMap::new(),
};
Ok(request)
}

View File

@@ -1,4 +1,4 @@
mod grpc_test;
mod http_test;
mod instance_test;
mod test_util;
pub(crate) mod test_util;

View File

@@ -1,8 +1,13 @@
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::{codec::InsertBatch, column, Column};
use api::v1::{
admin_result, codec::InsertBatch, column, Column, ColumnDef, CreateExpr, MutateResult,
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use servers::grpc::GrpcServer;
use servers::server::Server;
@@ -18,10 +23,8 @@ async fn test_insert_and_select() {
let instance = Arc::new(Instance::new(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(&instance).await.unwrap();
tokio::spawn(async move {
let mut grpc_server = GrpcServer::new(instance);
let mut grpc_server = GrpcServer::new(instance.clone(), instance);
let addr = "127.0.0.1:3001".parse::<SocketAddr>().unwrap();
grpc_server.start(addr).await.unwrap()
});
@@ -30,7 +33,8 @@ async fn test_insert_and_select() {
tokio::time::sleep(Duration::from_secs(1)).await;
let grpc_client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let db = Database::new("greptime", grpc_client);
let db = Database::new("greptime", grpc_client.clone());
let admin = Admin::new("greptime", grpc_client);
// testing data:
let expected_host_col = Column {
@@ -71,6 +75,17 @@ async fn test_insert_and_select() {
..Default::default()
};
// create
let expr = testing_create_expr();
let result = admin.create(expr).await.unwrap();
assert_matches!(
result.result,
Some(admin_result::Result::Mutate(MutateResult {
success: 1,
failure: 0
}))
);
// insert
let values = vec![InsertBatch {
columns: vec![
@@ -112,3 +127,39 @@ async fn test_insert_and_select() {
_ => unreachable!(),
}
}
fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
data_type: 12, // string
is_nullable: false,
},
ColumnDef {
name: "cpu".to_string(),
data_type: 10, // float64
is_nullable: true,
},
ColumnDef {
name: "memory".to_string(),
data_type: 10, // float64
is_nullable: true,
},
ColumnDef {
name: "ts".to_string(),
data_type: 4, // int64
is_nullable: true,
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "demo".to_string(),
desc: Some("blabla".to_string()),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: HashMap::new(),
}
}

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -66,6 +67,7 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
),
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
},
)
.await