refactor: datanode instance (#316)

* refactor: datanode Instance

* fix: resolve todo
This commit is contained in:
dennis zhuang
2022-10-19 10:51:45 +08:00
committed by GitHub
parent c6d91edb83
commit 94b263c261
3 changed files with 312 additions and 289 deletions

View File

@@ -1,39 +1,23 @@
use std::{fs, path, sync::Arc};
use api::v1::{
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use catalog::{CatalogManagerRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_telemetry::logging::{debug, error, info};
use common_telemetry::timer;
use catalog::CatalogManagerRef;
use common_telemetry::logging::info;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use object_store::{services::fs::Builder, util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use snafu::prelude::*;
use sql::statements::statement::Statement;
use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table::requests::AddColumnRequest;
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu,
UnsupportedExprSnafu,
};
use crate::metric;
use crate::error::{self, NewCatalogSnafu, Result};
use crate::script::ScriptExecutor;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::{self, insertion_expr_to_request};
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::{SqlHandler, SqlRequest};
use crate::sql::SqlHandler;
mod grpc;
mod sql;
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
@@ -81,164 +65,6 @@ impl Instance {
})
}
async fn add_new_columns_to_table(
&self,
table_name: &str,
add_columns: Vec<AddColumnRequest>,
) -> Result<()> {
let column_names = add_columns
.iter()
.map(|req| req.column_schema.name.clone())
.collect::<Vec<_>>();
let alter_request = insert::build_alter_table_request(table_name, add_columns);
debug!(
"Adding new columns: {:?} to table: {}",
column_names, table_name
);
let _result = self
.sql_handler()
.execute(SqlRequest::Alter(alter_request))
.await?;
info!(
"Added new columns: {:?} to table: {}",
column_names, table_name
);
Ok(())
}
async fn create_table_by_insert_batches(
&self,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<()> {
// Create table automatically, build schema from data.
let table_id = self.catalog_manager.next_table_id();
let create_table_request =
insert::build_create_table_request(table_id, table_name, insert_batches)?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_table_request,
);
let _result = self
.sql_handler()
.execute(SqlRequest::Create(create_table_request))
.await?;
info!("Success to create table: {} automatically", table_name);
Ok(())
}
pub async fn execute_grpc_insert(
&self,
table_name: &str,
values: insert_expr::Values,
) -> Result<Output> {
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let insert_batches = insert::insert_batches(values.values)?;
ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu);
let table = if let Some(table) = schema_provider.table(table_name) {
let schema = table.schema();
if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? {
self.add_new_columns_to_table(table_name, add_columns)
.await?;
}
table
} else {
self.create_table_by_insert_batches(table_name, &insert_batches)
.await?;
schema_provider
.table(table_name)
.context(TableNotFoundSnafu { table_name })?
};
let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?;
let affected_rows = table
.insert(insert)
.await
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
match stmt {
Statement::Query(_) => {
let logical_plan = self
.query_engine
.statement_to_plan(stmt)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
.await
.context(ExecuteSqlSnafu)
}
Statement::Insert(i) => {
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let request = self.sql_handler.insert_to_request(schema_provider, *i)?;
self.sql_handler.execute(request).await
}
Statement::Create(c) => {
let table_id = self.catalog_manager.next_table_id();
let _engine_name = c.engine.clone();
// TODO(hl): Select table engine by engine_name
let request = self.sql_handler.create_to_request(table_id, c)?;
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_name = request.table_name.clone();
let table_id = request.id;
info!(
"Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}",
catalog_name, schema_name, table_name, table_id
);
self.sql_handler.execute(SqlRequest::Create(request)).await
}
Statement::Alter(alter_table) => {
let req = self.sql_handler.alter_to_request(alter_table)?;
self.sql_handler.execute(SqlRequest::Alter(req)).await
}
Statement::ShowDatabases(stmt) => {
self.sql_handler
.execute(SqlRequest::ShowDatabases(stmt))
.await
}
Statement::ShowTables(stmt) => {
self.sql_handler.execute(SqlRequest::ShowTables(stmt)).await
}
}
}
pub async fn start(&self) -> Result<()> {
self.catalog_manager
.start()
@@ -247,41 +73,6 @@ impl Instance {
Ok(())
}
async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult {
match self.execute_grpc_insert(table_name, values).await {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Err(err) => {
// TODO(fys): failure count
build_err_result(&err)
}
_ => unreachable!(),
}
}
async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult {
let result = self.do_handle_select(select_expr).await;
to_object_result(result).await
}
async fn do_handle_select(&self, select_expr: SelectExpr) -> Result<Output> {
let expr = select_expr.expr;
match expr {
Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await,
Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => {
self.physical_planner
.execute(PhysicalPlanner::parse(plan)?, original_ql)
.await
}
_ => UnsupportedExprSnafu {
name: format!("{:?}", expr),
}
.fail(),
}
}
pub fn sql_handler(&self) -> &SqlHandler {
&self.sql_handler
}
@@ -366,76 +157,3 @@ async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result<LocalFile
Ok(log_store)
}
// TODO(LFC): Refactor datanode and frontend instances, separate impl for each query handler.
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(&self, query: &str) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_sql(query)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteQuerySnafu { query })
}
async fn insert_script(&self, name: &str, script: &str) -> servers::error::Result<()> {
self.script_executor.insert_script(name, script).await
}
async fn execute_script(&self, name: &str) -> servers::error::Result<Output> {
self.script_executor.execute_script(name).await
}
}
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> servers::error::Result<ObjectResult> {
let object_resp = match query.expr {
Some(object_expr::Expr::Insert(insert_expr)) => {
let table_name = &insert_expr.table_name;
let expr = insert_expr
.expr
.context(servers::error::InvalidQuerySnafu {
reason: "missing `expr` in `InsertExpr`",
})?;
match expr {
insert_expr::Expr::Values(values) => {
self.handle_insert(table_name, values).await
}
insert_expr::Expr::Sql(sql) => {
let output = self.execute_sql(&sql).await;
to_object_result(output).await
}
}
}
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
}
.fail();
}
};
Ok(object_resp)
}
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
}
.fail();
}
};
Ok(admin_resp)
}
}

View File

@@ -0,0 +1,203 @@
use api::v1::{
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
use common_query::Output;
use common_telemetry::logging::{debug, info};
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
use snafu::prelude::*;
use table::requests::AddColumnRequest;
use crate::error::{self, InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu};
use crate::instance::Instance;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::{self, insertion_expr_to_request};
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::SqlRequest;
impl Instance {
async fn add_new_columns_to_table(
&self,
table_name: &str,
add_columns: Vec<AddColumnRequest>,
) -> Result<()> {
let column_names = add_columns
.iter()
.map(|req| req.column_schema.name.clone())
.collect::<Vec<_>>();
let alter_request = insert::build_alter_table_request(table_name, add_columns);
debug!(
"Adding new columns: {:?} to table: {}",
column_names, table_name
);
let _result = self
.sql_handler()
.execute(SqlRequest::Alter(alter_request))
.await?;
info!(
"Added new columns: {:?} to table: {}",
column_names, table_name
);
Ok(())
}
async fn create_table_by_insert_batches(
&self,
table_name: &str,
insert_batches: &[InsertBatch],
) -> Result<()> {
// Create table automatically, build schema from data.
let table_id = self.catalog_manager.next_table_id();
let create_table_request =
insert::build_create_table_request(table_id, table_name, insert_batches)?;
info!(
"Try to create table: {} automatically with request: {:?}",
table_name, create_table_request,
);
let _result = self
.sql_handler()
.execute(SqlRequest::Create(create_table_request))
.await?;
info!("Success to create table: {} automatically", table_name);
Ok(())
}
pub async fn execute_grpc_insert(
&self,
table_name: &str,
values: insert_expr::Values,
) -> Result<Output> {
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let insert_batches = insert::insert_batches(values.values)?;
ensure!(!insert_batches.is_empty(), error::IllegalInsertDataSnafu);
let table = if let Some(table) = schema_provider.table(table_name) {
let schema = table.schema();
if let Some(add_columns) = insert::find_new_columns(&schema, &insert_batches)? {
self.add_new_columns_to_table(table_name, add_columns)
.await?;
}
table
} else {
self.create_table_by_insert_batches(table_name, &insert_batches)
.await?;
schema_provider
.table(table_name)
.context(TableNotFoundSnafu { table_name })?
};
let insert = insertion_expr_to_request(table_name, insert_batches, table.clone())?;
let affected_rows = table
.insert(insert)
.await
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
async fn handle_insert(&self, table_name: &str, values: insert_expr::Values) -> ObjectResult {
match self.execute_grpc_insert(table_name, values).await {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Err(err) => {
// TODO(fys): failure count
build_err_result(&err)
}
_ => unreachable!(),
}
}
async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult {
let result = self.do_handle_select(select_expr).await;
to_object_result(result).await
}
async fn do_handle_select(&self, select_expr: SelectExpr) -> Result<Output> {
let expr = select_expr.expr;
match expr {
Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await,
Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => {
self.physical_planner
.execute(PhysicalPlanner::parse(plan)?, original_ql)
.await
}
_ => UnsupportedExprSnafu {
name: format!("{:?}", expr),
}
.fail(),
}
}
}
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> servers::error::Result<ObjectResult> {
let object_resp = match query.expr {
Some(object_expr::Expr::Insert(insert_expr)) => {
let table_name = &insert_expr.table_name;
let expr = insert_expr
.expr
.context(servers::error::InvalidQuerySnafu {
reason: "missing `expr` in `InsertExpr`",
})?;
match expr {
insert_expr::Expr::Values(values) => {
self.handle_insert(table_name, values).await
}
insert_expr::Expr::Sql(sql) => {
let output = self.execute_sql(&sql).await;
to_object_result(output).await
}
}
}
Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
}
.fail();
}
};
Ok(object_resp)
}
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),
}
.fail();
}
};
Ok(admin_resp)
}
}

View File

@@ -0,0 +1,102 @@
use async_trait::async_trait;
use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::{
logging::{error, info},
timer,
};
use servers::query_handler::SqlQueryHandler;
use snafu::prelude::*;
use sql::statements::statement::Statement;
use crate::error::{ExecuteSqlSnafu, Result};
use crate::instance::Instance;
use crate::metric;
use crate::sql::SqlRequest;
impl Instance {
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
let stmt = self
.query_engine
.sql_to_statement(sql)
.context(ExecuteSqlSnafu)?;
match stmt {
Statement::Query(_) => {
let logical_plan = self
.query_engine
.statement_to_plan(stmt)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
.await
.context(ExecuteSqlSnafu)
}
Statement::Insert(i) => {
let schema_provider = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let request = self.sql_handler.insert_to_request(schema_provider, *i)?;
self.sql_handler.execute(request).await
}
Statement::Create(c) => {
let table_id = self.catalog_manager.next_table_id();
let _engine_name = c.engine.clone();
// TODO(hl): Select table engine by engine_name
let request = self.sql_handler.create_to_request(table_id, c)?;
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_name = request.table_name.clone();
let table_id = request.id;
info!(
"Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}",
catalog_name, schema_name, table_name, table_id
);
self.sql_handler.execute(SqlRequest::Create(request)).await
}
Statement::Alter(alter_table) => {
let req = self.sql_handler.alter_to_request(alter_table)?;
self.sql_handler.execute(SqlRequest::Alter(req)).await
}
Statement::ShowDatabases(stmt) => {
self.sql_handler
.execute(SqlRequest::ShowDatabases(stmt))
.await
}
Statement::ShowTables(stmt) => {
self.sql_handler.execute(SqlRequest::ShowTables(stmt)).await
}
}
}
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(&self, query: &str) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
self.execute_sql(query)
.await
.map_err(|e| {
error!(e; "Instance failed to execute sql");
BoxedError::new(e)
})
.context(servers::error::ExecuteQuerySnafu { query })
}
async fn insert_script(&self, name: &str, script: &str) -> servers::error::Result<()> {
self.script_executor.insert_script(name, script).await
}
async fn execute_script(&self, name: &str) -> servers::error::Result<Output> {
self.script_executor.execute_script(name).await
}
}