refactor(frontend): adjust code structure (#1615)

* move  to expr_factory

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move configs into service_config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move GrpcQueryHandler into distributed.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-05-20 02:09:20 +08:00
committed by GitHub
parent 82f2b34f4d
commit c3eeda7d84
16 changed files with 138 additions and 148 deletions

View File

@@ -17,9 +17,8 @@ use std::sync::Arc;
use clap::Parser;
use common_base::Plugins;
use frontend::frontend::FrontendOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::prom::PromOptions;
use frontend::service_config::{InfluxdbOptions, PromOptions};
use meta_client::MetaClientOptions;
use servers::auth::UserProviderRef;
use servers::tls::{TlsMode, TlsOption};
@@ -227,7 +226,7 @@ mod tests {
use std::time::Duration;
use common_test_util::temp_dir::create_named_temp_file;
use frontend::grpc::GrpcOptions;
use frontend::service_config::GrpcOptions;
use servers::auth::{Identity, Password, UserProviderRef};
use super::*;

View File

@@ -21,14 +21,11 @@ use common_telemetry::logging::LoggingOptions;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prom::PromOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions,
PrometheusOptions,
};
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};

View File

@@ -16,7 +16,11 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{Column, ColumnDataType, CreateTableExpr};
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, CreateTableExpr, DropColumn,
DropColumns, RenameTable,
};
use common_error::prelude::BoxedError;
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::schema::ColumnSchema;
@@ -25,15 +29,16 @@ use query::sql::prepare_immutable_file_table_files_and_schema;
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::statements::column_def_to_schema;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def};
use sql::util::to_lowercase_options_map;
use table::requests::{TableOptions, IMMUTABLE_TABLE_META_KEY};
use crate::error::{
self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu,
ConvertColumnDefaultConstraintSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu,
ParseSqlSnafu, Result,
ConvertColumnDefaultConstraintSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu,
InvalidSqlSnafu, ParseSqlSnafu, Result,
};
pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
@@ -270,6 +275,50 @@ pub(crate) fn column_schemas_to_defs(
.collect()
}
pub(crate) fn to_alter_expr(
alter_table: AlterTable,
query_ctx: QueryContextRef,
) -> Result<AlterExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(_) => {
return error::NotSupportedSnafu {
feat: "ADD CONSTRAINT",
}
.fail();
}
AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(column_def)
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
),
is_key: false,
}],
}),
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: name.value.to_string(),
}],
}),
AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable {
new_table_name: new_table_name.to_string(),
}),
};
Ok(AlterExpr {
catalog_name,
schema_name,
table_name,
kind: Some(kind),
})
}
#[cfg(test)]
mod tests {
use session::context::QueryContext;

View File

@@ -18,13 +18,10 @@ use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use crate::grpc::GrpcOptions;
use crate::influxdb::InfluxdbOptions;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prom::PromOptions;
use crate::prometheus::PrometheusOptions;
use crate::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions,
PrometheusOptions,
};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod grpc;
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr,
FlushTableExpr, InsertRequest, TableId,
@@ -46,6 +46,7 @@ use partition::manager::PartitionInfo;
use partition::partition::{PartitionBound, PartitionDef};
use query::error::QueryExecutionSnafu;
use query::query_engine::SqlStatementExecutor;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Ident, Value as SqlValue};
@@ -340,7 +341,7 @@ impl DistInstance {
Ok(Output::AffectedRows(0))
}
Statement::Alter(alter_table) => {
let expr = grpc::to_alter_expr(alter_table, query_ctx)?;
let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?;
self.handle_alter_table(expr).await
}
Statement::DropTable(stmt) => {
@@ -598,6 +599,46 @@ impl SqlStatementExecutor for DistInstance {
}
}
#[async_trait]
impl GrpcQueryHandler for DistInstance {
type Error = error::Error;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Insert(request) => self.handle_dist_insert(request, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
Request::Ddl(request) => {
let expr = request.expr.context(error::IncompleteGrpcResultSnafu {
err_msg: "Missing 'expr' in DDL request",
})?;
match expr {
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await,
DdlExpr::CreateTable(mut expr) => {
// TODO(LFC): Support creating distributed table through GRPC interface.
// Currently only SQL supports it; how to design the fields in CreateTableExpr?
let _ = self.create_table(&mut expr, None).await;
Ok(Output::AffectedRows(0))
}
DdlExpr::Alter(expr) => self.handle_alter_table(expr).await,
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.drop_table(table_name).await
}
DdlExpr::FlushTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.flush_table(table_name, expr.region_id).await
}
}
}
}
}
}
fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
if partitions.is_empty() {
return Ok(None);

View File

@@ -1,115 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{alter_expr, AddColumn, AddColumns, AlterExpr, DropColumn, DropColumns, RenameTable};
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::Output;
use datanode::instance::sql::table_idents_to_full_name;
use meta_client::rpc::TableName;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::sql_column_def_to_grpc_column_def;
use crate::error::{self, ExternalSnafu, Result};
use crate::instance::distributed::DistInstance;
#[async_trait]
impl GrpcQueryHandler for DistInstance {
type Error = error::Error;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Insert(request) => self.handle_dist_insert(request, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
Request::Ddl(request) => {
let expr = request.expr.context(error::IncompleteGrpcResultSnafu {
err_msg: "Missing 'expr' in DDL request",
})?;
match expr {
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await,
DdlExpr::CreateTable(mut expr) => {
// TODO(LFC): Support creating distributed table through GRPC interface.
// Currently only SQL supports it; how to design the fields in CreateTableExpr?
let _ = self.create_table(&mut expr, None).await;
Ok(Output::AffectedRows(0))
}
DdlExpr::Alter(expr) => self.handle_alter_table(expr).await,
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.drop_table(table_name).await
}
DdlExpr::FlushTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.flush_table(table_name, expr.region_id).await
}
}
}
}
}
}
pub(crate) fn to_alter_expr(
alter_table: AlterTable,
query_ctx: QueryContextRef,
) -> Result<AlterExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let kind = match alter_table.alter_operation() {
AlterTableOperation::AddConstraint(_) => {
return error::NotSupportedSnafu {
feat: "ADD CONSTRAINT",
}
.fail();
}
AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(column_def)
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
),
is_key: false,
}],
}),
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: name.value.to_string(),
}],
}),
AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable {
new_table_name: new_table_name.to_string(),
}),
};
Ok(AlterExpr {
catalog_name,
schema_name,
table_name,
kind: Some(kind),
})
}

View File

@@ -20,17 +20,11 @@ pub mod datanode;
pub mod error;
pub mod expr_factory;
pub mod frontend;
pub mod grpc;
pub mod heartbeat;
pub mod influxdb;
pub mod instance;
pub(crate) mod metrics;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prometheus;
mod script;
mod server;
pub mod service_config;
pub mod statement;
pub mod table;

View File

@@ -37,9 +37,8 @@ use snafu::ResultExt;
use crate::error::Error::StartServer;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::FrontendInstance;
use crate::prometheus::PrometheusOptions;
use crate::service_config::{InfluxdbOptions, PrometheusOptions};
pub(crate) struct Services;

View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod grpc;
pub mod influxdb;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prometheus;
pub use grpc::GrpcOptions;
pub use influxdb::InfluxdbOptions;
pub use mysql::MysqlOptions;
pub use opentsdb::OpentsdbOptions;
pub use postgres::PostgresOptions;
pub use prom::PromOptions;
pub use prometheus::PrometheusOptions;