From c3eeda7d8476f4b53621f01bad616be1603a0ffb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 20 May 2023 02:09:20 +0800 Subject: [PATCH] refactor(frontend): adjust code structure (#1615) * move to expr_factory Signed-off-by: Ruihang Xia * move configs into service_config Signed-off-by: Ruihang Xia * move GrpcQueryHandler into distributed.rs Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/cmd/src/frontend.rs | 5 +- src/cmd/src/standalone.rs | 11 +- src/frontend/src/expr_factory.rs | 57 ++++++++- src/frontend/src/frontend.rs | 11 +- src/frontend/src/instance/distributed.rs | 47 ++++++- src/frontend/src/instance/distributed/grpc.rs | 115 ------------------ src/frontend/src/lib.rs | 8 +- src/frontend/src/server.rs | 3 +- src/frontend/src/service_config.rs | 29 +++++ src/frontend/src/{ => service_config}/grpc.rs | 0 .../src/{ => service_config}/influxdb.rs | 0 .../src/{ => service_config}/mysql.rs | 0 .../src/{ => service_config}/opentsdb.rs | 0 .../src/{ => service_config}/postgres.rs | 0 src/frontend/src/{ => service_config}/prom.rs | 0 .../src/{ => service_config}/prometheus.rs | 0 16 files changed, 138 insertions(+), 148 deletions(-) delete mode 100644 src/frontend/src/instance/distributed/grpc.rs create mode 100644 src/frontend/src/service_config.rs rename src/frontend/src/{ => service_config}/grpc.rs (100%) rename src/frontend/src/{ => service_config}/influxdb.rs (100%) rename src/frontend/src/{ => service_config}/mysql.rs (100%) rename src/frontend/src/{ => service_config}/opentsdb.rs (100%) rename src/frontend/src/{ => service_config}/postgres.rs (100%) rename src/frontend/src/{ => service_config}/prom.rs (100%) rename src/frontend/src/{ => service_config}/prometheus.rs (100%) diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index ba57a99a54..37117895fa 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -17,9 +17,8 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; use frontend::frontend::FrontendOptions; -use frontend::influxdb::InfluxdbOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; -use frontend::prom::PromOptions; +use frontend::service_config::{InfluxdbOptions, PromOptions}; use meta_client::MetaClientOptions; use servers::auth::UserProviderRef; use servers::tls::{TlsMode, TlsOption}; @@ -227,7 +226,7 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use frontend::grpc::GrpcOptions; + use frontend::service_config::GrpcOptions; use servers::auth::{Identity, Password, UserProviderRef}; use super::*; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f53c11ede3..97415329b1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,14 +21,11 @@ use common_telemetry::logging::LoggingOptions; use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig}; use datanode::instance::InstanceRef; use frontend::frontend::FrontendOptions; -use frontend::grpc::GrpcOptions; -use frontend::influxdb::InfluxdbOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; -use frontend::mysql::MysqlOptions; -use frontend::opentsdb::OpentsdbOptions; -use frontend::postgres::PostgresOptions; -use frontend::prom::PromOptions; -use frontend::prometheus::PrometheusOptions; +use frontend::service_config::{ + GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions, + PrometheusOptions, +}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index d59d58906b..f2541f33e5 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -16,7 +16,11 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{Column, ColumnDataType, CreateTableExpr}; +use api::v1::alter_expr::Kind; +use api::v1::{ + AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, CreateTableExpr, DropColumn, + DropColumns, RenameTable, +}; use common_error::prelude::BoxedError; use datanode::instance::sql::table_idents_to_full_name; use datatypes::schema::ColumnSchema; @@ -25,15 +29,16 @@ use query::sql::prepare_immutable_file_table_files_and_schema; use session::context::QueryContextRef; use snafu::{ensure, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; -use sql::statements::column_def_to_schema; +use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::create::{CreateExternalTable, CreateTable, TIME_INDEX}; +use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def}; use sql::util::to_lowercase_options_map; use table::requests::{TableOptions, IMMUTABLE_TABLE_META_KEY}; use crate::error::{ self, BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, - ConvertColumnDefaultConstraintSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu, - ParseSqlSnafu, Result, + ConvertColumnDefaultConstraintSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, + InvalidSqlSnafu, ParseSqlSnafu, Result, }; pub type CreateExprFactoryRef = Arc; @@ -270,6 +275,50 @@ pub(crate) fn column_schemas_to_defs( .collect() } +pub(crate) fn to_alter_expr( + alter_table: AlterTable, + query_ctx: QueryContextRef, +) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(alter_table.table_name(), query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let kind = match alter_table.alter_operation() { + AlterTableOperation::AddConstraint(_) => { + return error::NotSupportedSnafu { + feat: "ADD CONSTRAINT", + } + .fail(); + } + AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some( + sql_column_def_to_grpc_column_def(column_def) + .map_err(BoxedError::new) + .context(ExternalSnafu)?, + ), + is_key: false, + }], + }), + AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns { + drop_columns: vec![DropColumn { + name: name.value.to_string(), + }], + }), + AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable { + new_table_name: new_table_name.to_string(), + }), + }; + + Ok(AlterExpr { + catalog_name, + schema_name, + table_name, + kind: Some(kind), + }) +} + #[cfg(test)] mod tests { use session::context::QueryContext; diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 59c6a99728..ab8dbde174 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -18,13 +18,10 @@ use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::Mode; -use crate::grpc::GrpcOptions; -use crate::influxdb::InfluxdbOptions; -use crate::mysql::MysqlOptions; -use crate::opentsdb::OpentsdbOptions; -use crate::postgres::PostgresOptions; -use crate::prom::PromOptions; -use crate::prometheus::PrometheusOptions; +use crate::service_config::{ + GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions, + PrometheusOptions, +}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 5f925517c9..e29aba8570 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod grpc; - use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; +use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_request::Request; use api::v1::{ column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, FlushTableExpr, InsertRequest, TableId, @@ -46,6 +46,7 @@ use partition::manager::PartitionInfo; use partition::partition::{PartitionBound, PartitionDef}; use query::error::QueryExecutionSnafu; use query::query_engine::SqlStatementExecutor; +use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{Ident, Value as SqlValue}; @@ -340,7 +341,7 @@ impl DistInstance { Ok(Output::AffectedRows(0)) } Statement::Alter(alter_table) => { - let expr = grpc::to_alter_expr(alter_table, query_ctx)?; + let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?; self.handle_alter_table(expr).await } Statement::DropTable(stmt) => { @@ -598,6 +599,46 @@ impl SqlStatementExecutor for DistInstance { } } +#[async_trait] +impl GrpcQueryHandler for DistInstance { + type Error = error::Error; + + async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { + match request { + Request::Insert(request) => self.handle_dist_insert(request, ctx).await, + Request::Delete(request) => self.handle_dist_delete(request, ctx).await, + Request::Query(_) => { + unreachable!("Query should have been handled directly in Frontend Instance!") + } + Request::Ddl(request) => { + let expr = request.expr.context(error::IncompleteGrpcResultSnafu { + err_msg: "Missing 'expr' in DDL request", + })?; + match expr { + DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await, + DdlExpr::CreateTable(mut expr) => { + // TODO(LFC): Support creating distributed table through GRPC interface. + // Currently only SQL supports it; how to design the fields in CreateTableExpr? + let _ = self.create_table(&mut expr, None).await; + Ok(Output::AffectedRows(0)) + } + DdlExpr::Alter(expr) => self.handle_alter_table(expr).await, + DdlExpr::DropTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.drop_table(table_name).await + } + DdlExpr::FlushTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.flush_table(table_name, expr.region_id).await + } + } + } + } + } +} + fn create_partitions_stmt(partitions: Vec) -> Result> { if partitions.is_empty() { return Ok(None); diff --git a/src/frontend/src/instance/distributed/grpc.rs b/src/frontend/src/instance/distributed/grpc.rs deleted file mode 100644 index 1befeae4f6..0000000000 --- a/src/frontend/src/instance/distributed/grpc.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use alter_expr::Kind; -use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::greptime_request::Request; -use api::v1::{alter_expr, AddColumn, AddColumns, AlterExpr, DropColumn, DropColumns, RenameTable}; -use async_trait::async_trait; -use common_error::prelude::BoxedError; -use common_query::Output; -use datanode::instance::sql::table_idents_to_full_name; -use meta_client::rpc::TableName; -use servers::query_handler::grpc::GrpcQueryHandler; -use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; -use sql::statements::alter::{AlterTable, AlterTableOperation}; -use sql::statements::sql_column_def_to_grpc_column_def; - -use crate::error::{self, ExternalSnafu, Result}; -use crate::instance::distributed::DistInstance; - -#[async_trait] -impl GrpcQueryHandler for DistInstance { - type Error = error::Error; - - async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { - match request { - Request::Insert(request) => self.handle_dist_insert(request, ctx).await, - Request::Delete(request) => self.handle_dist_delete(request, ctx).await, - Request::Query(_) => { - unreachable!("Query should have been handled directly in Frontend Instance!") - } - Request::Ddl(request) => { - let expr = request.expr.context(error::IncompleteGrpcResultSnafu { - err_msg: "Missing 'expr' in DDL request", - })?; - match expr { - DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await, - DdlExpr::CreateTable(mut expr) => { - // TODO(LFC): Support creating distributed table through GRPC interface. - // Currently only SQL supports it; how to design the fields in CreateTableExpr? - let _ = self.create_table(&mut expr, None).await; - Ok(Output::AffectedRows(0)) - } - DdlExpr::Alter(expr) => self.handle_alter_table(expr).await, - DdlExpr::DropTable(expr) => { - let table_name = - TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - self.drop_table(table_name).await - } - DdlExpr::FlushTable(expr) => { - let table_name = - TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - self.flush_table(table_name, expr.region_id).await - } - } - } - } - } -} - -pub(crate) fn to_alter_expr( - alter_table: AlterTable, - query_ctx: QueryContextRef, -) -> Result { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(alter_table.table_name(), query_ctx) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - let kind = match alter_table.alter_operation() { - AlterTableOperation::AddConstraint(_) => { - return error::NotSupportedSnafu { - feat: "ADD CONSTRAINT", - } - .fail(); - } - AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns { - add_columns: vec![AddColumn { - column_def: Some( - sql_column_def_to_grpc_column_def(column_def) - .map_err(BoxedError::new) - .context(ExternalSnafu)?, - ), - is_key: false, - }], - }), - AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns { - drop_columns: vec![DropColumn { - name: name.value.to_string(), - }], - }), - AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable { - new_table_name: new_table_name.to_string(), - }), - }; - - Ok(AlterExpr { - catalog_name, - schema_name, - table_name, - kind: Some(kind), - }) -} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 412c19c79d..def0982ad8 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -20,17 +20,11 @@ pub mod datanode; pub mod error; pub mod expr_factory; pub mod frontend; -pub mod grpc; pub mod heartbeat; -pub mod influxdb; pub mod instance; pub(crate) mod metrics; -pub mod mysql; -pub mod opentsdb; -pub mod postgres; -pub mod prom; -pub mod prometheus; mod script; mod server; +pub mod service_config; pub mod statement; pub mod table; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 6d71d42c1e..9ab3a45625 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -37,9 +37,8 @@ use snafu::ResultExt; use crate::error::Error::StartServer; use crate::error::{self, Result}; use crate::frontend::FrontendOptions; -use crate::influxdb::InfluxdbOptions; use crate::instance::FrontendInstance; -use crate::prometheus::PrometheusOptions; +use crate::service_config::{InfluxdbOptions, PrometheusOptions}; pub(crate) struct Services; diff --git a/src/frontend/src/service_config.rs b/src/frontend/src/service_config.rs new file mode 100644 index 0000000000..b4435207bd --- /dev/null +++ b/src/frontend/src/service_config.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod grpc; +pub mod influxdb; +pub mod mysql; +pub mod opentsdb; +pub mod postgres; +pub mod prom; +pub mod prometheus; + +pub use grpc::GrpcOptions; +pub use influxdb::InfluxdbOptions; +pub use mysql::MysqlOptions; +pub use opentsdb::OpentsdbOptions; +pub use postgres::PostgresOptions; +pub use prom::PromOptions; +pub use prometheus::PrometheusOptions; diff --git a/src/frontend/src/grpc.rs b/src/frontend/src/service_config/grpc.rs similarity index 100% rename from src/frontend/src/grpc.rs rename to src/frontend/src/service_config/grpc.rs diff --git a/src/frontend/src/influxdb.rs b/src/frontend/src/service_config/influxdb.rs similarity index 100% rename from src/frontend/src/influxdb.rs rename to src/frontend/src/service_config/influxdb.rs diff --git a/src/frontend/src/mysql.rs b/src/frontend/src/service_config/mysql.rs similarity index 100% rename from src/frontend/src/mysql.rs rename to src/frontend/src/service_config/mysql.rs diff --git a/src/frontend/src/opentsdb.rs b/src/frontend/src/service_config/opentsdb.rs similarity index 100% rename from src/frontend/src/opentsdb.rs rename to src/frontend/src/service_config/opentsdb.rs diff --git a/src/frontend/src/postgres.rs b/src/frontend/src/service_config/postgres.rs similarity index 100% rename from src/frontend/src/postgres.rs rename to src/frontend/src/service_config/postgres.rs diff --git a/src/frontend/src/prom.rs b/src/frontend/src/service_config/prom.rs similarity index 100% rename from src/frontend/src/prom.rs rename to src/frontend/src/service_config/prom.rs diff --git a/src/frontend/src/prometheus.rs b/src/frontend/src/service_config/prometheus.rs similarity index 100% rename from src/frontend/src/prometheus.rs rename to src/frontend/src/service_config/prometheus.rs