diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index dc7b27ed76..914edff089 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,39 +1,23 @@ use std::{fs, path, sync::Arc}; -use api::v1::{ - admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, - ObjectExpr, ObjectResult, SelectExpr, -}; -use async_trait::async_trait; -use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_error::prelude::BoxedError; -use common_error::status_code::StatusCode; -use common_query::Output; -use common_telemetry::logging::{debug, error, info}; -use common_telemetry::timer; +use catalog::CatalogManagerRef; +use common_telemetry::logging::info; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use object_store::{services::fs::Builder, util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; -use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler}; use snafu::prelude::*; -use sql::statements::statement::Statement; use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; -use table::requests::AddColumnRequest; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; -use crate::error::{ - self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu, - UnsupportedExprSnafu, -}; -use crate::metric; +use crate::error::{self, NewCatalogSnafu, Result}; use crate::script::ScriptExecutor; -use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; -use crate::server::grpc::insert::{self, insertion_expr_to_request}; use crate::server::grpc::plan::PhysicalPlanner; -use crate::server::grpc::select::to_object_result; -use crate::sql::{SqlHandler, SqlRequest}; +use crate::sql::SqlHandler; + +mod grpc; +mod sql; type DefaultEngine = MitoEngine>; @@ -81,164 +65,6 @@ impl Instance { }) } - async fn add_new_columns_to_table( - &self, - table_name: &str, - add_columns: Vec, - ) -> Result<()> { - let column_names = add_columns - .iter() - .map(|req| req.column_schema.name.clone()) - .collect::>(); - - let alter_request = insert::build_alter_table_request(table_name, add_columns); - - debug!( - "Adding new columns: {:?} to table: {}", - column_names, table_name - ); - - let _result = self - .sql_handler() - .execute(SqlRequest::Alter(alter_request)) - .await?; - - info!( - "Added new columns: {:?} to table: {}", - column_names, table_name - ); - Ok(()) - } - - async fn create_table_by_insert_batches( - &self, - table_name: &str, - insert_batches: &[InsertBatch], - ) -> Result<()> { - // Create table automatically, build schema from data. - let table_id = self.catalog_manager.next_table_id(); - let create_table_request = - insert::build_create_table_request(table_id, table_name, insert_batches)?; - - info!( - "Try to create table: {} automatically with request: {:?}", - table_name, create_table_request, - ); - - let _result = self - .sql_handler() - .execute(SqlRequest::Create(create_table_request)) - .await?; - - info!("Success to create table: {} automatically", table_name); - - Ok(()) - } - - pub async fn execute_grpc_insert( - &self, - table_name: &str, - values: insert_expr::Values, - ) -> Result { - let schema_provider = self - .catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .unwrap() - .schema(DEFAULT_SCHEMA_NAME) - .unwrap(); - - let insert_batches = insert::insert_batches(values.values)?; - ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu); - - let table = if let Some(table) = schema_provider.table(table_name) { - let schema = table.schema(); - if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? { - self.add_new_columns_to_table(table_name, add_columns) - .await?; - } - - table - } else { - self.create_table_by_insert_batches(table_name, &insert_batches) - .await?; - - schema_provider - .table(table_name) - .context(TableNotFoundSnafu { table_name })? - }; - - let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?; - - let affected_rows = table - .insert(insert) - .await - .context(InsertSnafu { table_name })?; - - Ok(Output::AffectedRows(affected_rows)) - } - - pub async fn execute_sql(&self, sql: &str) -> Result { - let stmt = self - .query_engine - .sql_to_statement(sql) - .context(ExecuteSqlSnafu)?; - - match stmt { - Statement::Query(_) => { - let logical_plan = self - .query_engine - .statement_to_plan(stmt) - .context(ExecuteSqlSnafu)?; - - self.query_engine - .execute(&logical_plan) - .await - .context(ExecuteSqlSnafu) - } - Statement::Insert(i) => { - let schema_provider = self - .catalog_manager - .catalog(DEFAULT_CATALOG_NAME) - .unwrap() - .schema(DEFAULT_SCHEMA_NAME) - .unwrap(); - - let request = self.sql_handler.insert_to_request(schema_provider, *i)?; - self.sql_handler.execute(request).await - } - - Statement::Create(c) => { - let table_id = self.catalog_manager.next_table_id(); - let _engine_name = c.engine.clone(); - // TODO(hl): Select table engine by engine_name - - let request = self.sql_handler.create_to_request(table_id, c)?; - let catalog_name = request.catalog_name.clone(); - let schema_name = request.schema_name.clone(); - let table_name = request.table_name.clone(); - let table_id = request.id; - info!( - "Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}", - catalog_name, schema_name, table_name, table_id - ); - - self.sql_handler.execute(SqlRequest::Create(request)).await - } - Statement::Alter(alter_table) => { - let req = self.sql_handler.alter_to_request(alter_table)?; - self.sql_handler.execute(SqlRequest::Alter(req)).await - } - Statement::ShowDatabases(stmt) => { - self.sql_handler - .execute(SqlRequest::ShowDatabases(stmt)) - .await - } - Statement::ShowTables(stmt) => { - self.sql_handler.execute(SqlRequest::ShowTables(stmt)).await - } - } - } - pub async fn start(&self) -> Result<()> { self.catalog_manager .start() @@ -247,41 +73,6 @@ impl Instance { Ok(()) } - async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult { - match self.execute_grpc_insert(table_name, values).await { - Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Err(err) => { - // TODO(fys): failure count - build_err_result(&err) - } - _ => unreachable!(), - } - } - - async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { - let result = self.do_handle_select(select_expr).await; - to_object_result(result).await - } - - async fn do_handle_select(&self, select_expr: SelectExpr) -> Result { - let expr = select_expr.expr; - match expr { - Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await, - Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => { - self.physical_planner - .execute(PhysicalPlanner::parse(plan)?, original_ql) - .await - } - _ => UnsupportedExprSnafu { - name: format!("{:?}", expr), - } - .fail(), - } - } - pub fn sql_handler(&self) -> &SqlHandler { &self.sql_handler } @@ -366,76 +157,3 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result servers::error::Result { - let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); - self.execute_sql(query) - .await - .map_err(|e| { - error!(e; "Instance failed to execute sql"); - BoxedError::new(e) - }) - .context(servers::error::ExecuteQuerySnafu { query }) - } - - async fn insert_script(&self, name: &str, script: &str) -> servers::error::Result<()> { - self.script_executor.insert_script(name, script).await - } - - async fn execute_script(&self, name: &str) -> servers::error::Result { - self.script_executor.execute_script(name).await - } -} - -#[async_trait] -impl GrpcQueryHandler for Instance { - async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { - let object_resp = match query.expr { - Some(object_expr::Expr::Insert(insert_expr)) => { - let table_name = &insert_expr.table_name; - let expr = insert_expr - .expr - .context(servers::error::InvalidQuerySnafu { - reason: "missing `expr` in `InsertExpr`", - })?; - match expr { - insert_expr::Expr::Values(values) => { - self.handle_insert(table_name, values).await - } - insert_expr::Expr::Sql(sql) => { - let output = self.execute_sql(&sql).await; - to_object_result(output).await - } - } - } - Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await, - other => { - return servers::error::NotSupportedSnafu { - feat: format!("{:?}", other), - } - .fail(); - } - }; - Ok(object_resp) - } -} - -#[async_trait] -impl GrpcAdminHandler for Instance { - async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result { - let admin_resp = match expr.expr { - Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await, - Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await, - other => { - return servers::error::NotSupportedSnafu { - feat: format!("{:?}", other), - } - .fail(); - } - }; - Ok(admin_resp) - } -} diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs new file mode 100644 index 0000000000..2cb04767dd --- /dev/null +++ b/src/datanode/src/instance/grpc.rs @@ -0,0 +1,203 @@ +use api::v1::{ + admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, + ObjectExpr, ObjectResult, SelectExpr, +}; +use async_trait::async_trait; +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::status_code::StatusCode; +use common_query::Output; +use common_telemetry::logging::{debug, info}; +use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler}; +use snafu::prelude::*; +use table::requests::AddColumnRequest; + +use crate::error::{self, InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu}; +use crate::instance::Instance; +use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; +use crate::server::grpc::insert::{self, insertion_expr_to_request}; +use crate::server::grpc::plan::PhysicalPlanner; +use crate::server::grpc::select::to_object_result; +use crate::sql::SqlRequest; + +impl Instance { + async fn add_new_columns_to_table( + &self, + table_name: &str, + add_columns: Vec, + ) -> Result<()> { + let column_names = add_columns + .iter() + .map(|req| req.column_schema.name.clone()) + .collect::>(); + + let alter_request = insert::build_alter_table_request(table_name, add_columns); + + debug!( + "Adding new columns: {:?} to table: {}", + column_names, table_name + ); + + let _result = self + .sql_handler() + .execute(SqlRequest::Alter(alter_request)) + .await?; + + info!( + "Added new columns: {:?} to table: {}", + column_names, table_name + ); + Ok(()) + } + + async fn create_table_by_insert_batches( + &self, + table_name: &str, + insert_batches: &[InsertBatch], + ) -> Result<()> { + // Create table automatically, build schema from data. + let table_id = self.catalog_manager.next_table_id(); + let create_table_request = + insert::build_create_table_request(table_id, table_name, insert_batches)?; + + info!( + "Try to create table: {} automatically with request: {:?}", + table_name, create_table_request, + ); + + let _result = self + .sql_handler() + .execute(SqlRequest::Create(create_table_request)) + .await?; + + info!("Success to create table: {} automatically", table_name); + + Ok(()) + } + + pub async fn execute_grpc_insert( + &self, + table_name: &str, + values: insert_expr::Values, + ) -> Result { + let schema_provider = self + .catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + + let insert_batches = insert::insert_batches(values.values)?; + ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu); + + let table = if let Some(table) = schema_provider.table(table_name) { + let schema = table.schema(); + if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? { + self.add_new_columns_to_table(table_name, add_columns) + .await?; + } + + table + } else { + self.create_table_by_insert_batches(table_name, &insert_batches) + .await?; + + schema_provider + .table(table_name) + .context(TableNotFoundSnafu { table_name })? + }; + + let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?; + + let affected_rows = table + .insert(insert) + .await + .context(InsertSnafu { table_name })?; + + Ok(Output::AffectedRows(affected_rows)) + } + + async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult { + match self.execute_grpc_insert(table_name, values).await { + Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Err(err) => { + // TODO(fys): failure count + build_err_result(&err) + } + _ => unreachable!(), + } + } + + async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { + let result = self.do_handle_select(select_expr).await; + to_object_result(result).await + } + + async fn do_handle_select(&self, select_expr: SelectExpr) -> Result { + let expr = select_expr.expr; + match expr { + Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await, + Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => { + self.physical_planner + .execute(PhysicalPlanner::parse(plan)?, original_ql) + .await + } + _ => UnsupportedExprSnafu { + name: format!("{:?}", expr), + } + .fail(), + } + } +} + +#[async_trait] +impl GrpcQueryHandler for Instance { + async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { + let object_resp = match query.expr { + Some(object_expr::Expr::Insert(insert_expr)) => { + let table_name = &insert_expr.table_name; + let expr = insert_expr + .expr + .context(servers::error::InvalidQuerySnafu { + reason: "missing `expr` in `InsertExpr`", + })?; + match expr { + insert_expr::Expr::Values(values) => { + self.handle_insert(table_name, values).await + } + insert_expr::Expr::Sql(sql) => { + let output = self.execute_sql(&sql).await; + to_object_result(output).await + } + } + } + Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await, + other => { + return servers::error::NotSupportedSnafu { + feat: format!("{:?}", other), + } + .fail(); + } + }; + Ok(object_resp) + } +} + +#[async_trait] +impl GrpcAdminHandler for Instance { + async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result { + let admin_resp = match expr.expr { + Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await, + Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await, + other => { + return servers::error::NotSupportedSnafu { + feat: format!("{:?}", other), + } + .fail(); + } + }; + Ok(admin_resp) + } +} diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs new file mode 100644 index 0000000000..e204a04621 --- /dev/null +++ b/src/datanode/src/instance/sql.rs @@ -0,0 +1,102 @@ +use async_trait::async_trait; +use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::prelude::BoxedError; +use common_query::Output; +use common_telemetry::{ + logging::{error, info}, + timer, +}; +use servers::query_handler::SqlQueryHandler; +use snafu::prelude::*; +use sql::statements::statement::Statement; + +use crate::error::{ExecuteSqlSnafu, Result}; +use crate::instance::Instance; +use crate::metric; +use crate::sql::SqlRequest; + +impl Instance { + pub async fn execute_sql(&self, sql: &str) -> Result { + let stmt = self + .query_engine + .sql_to_statement(sql) + .context(ExecuteSqlSnafu)?; + + match stmt { + Statement::Query(_) => { + let logical_plan = self + .query_engine + .statement_to_plan(stmt) + .context(ExecuteSqlSnafu)?; + + self.query_engine + .execute(&logical_plan) + .await + .context(ExecuteSqlSnafu) + } + Statement::Insert(i) => { + let schema_provider = self + .catalog_manager + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + + let request = self.sql_handler.insert_to_request(schema_provider, *i)?; + self.sql_handler.execute(request).await + } + + Statement::Create(c) => { + let table_id = self.catalog_manager.next_table_id(); + let _engine_name = c.engine.clone(); + // TODO(hl): Select table engine by engine_name + + let request = self.sql_handler.create_to_request(table_id, c)?; + let catalog_name = request.catalog_name.clone(); + let schema_name = request.schema_name.clone(); + let table_name = request.table_name.clone(); + let table_id = request.id; + info!( + "Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}", + catalog_name, schema_name, table_name, table_id + ); + + self.sql_handler.execute(SqlRequest::Create(request)).await + } + Statement::Alter(alter_table) => { + let req = self.sql_handler.alter_to_request(alter_table)?; + self.sql_handler.execute(SqlRequest::Alter(req)).await + } + Statement::ShowDatabases(stmt) => { + self.sql_handler + .execute(SqlRequest::ShowDatabases(stmt)) + .await + } + Statement::ShowTables(stmt) => { + self.sql_handler.execute(SqlRequest::ShowTables(stmt)).await + } + } + } +} + +#[async_trait] +impl SqlQueryHandler for Instance { + async fn do_query(&self, query: &str) -> servers::error::Result { + let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); + self.execute_sql(query) + .await + .map_err(|e| { + error!(e; "Instance failed to execute sql"); + BoxedError::new(e) + }) + .context(servers::error::ExecuteQuerySnafu { query }) + } + + async fn insert_script(&self, name: &str, script: &str) -> servers::error::Result<()> { + self.script_executor.insert_script(name, script).await + } + + async fn execute_script(&self, name: &str) -> servers::error::Result { + self.script_executor.execute_script(name).await + } +}