diff --git a/Cargo.lock b/Cargo.lock index 4c16c66dff..be2e1b0926 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,6 +119,7 @@ name = "api" version = "0.1.0" dependencies = [ "common-base", + "common-error", "common-time", "datatypes", "prost 0.11.0", @@ -1727,6 +1728,7 @@ dependencies = [ "datafusion", "datafusion-common", "datatypes", + "frontend", "futures", "hyper", "log-store", @@ -5449,6 +5451,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" name = "sql" version = "0.1.0" dependencies = [ + "api", "catalog", "common-catalog", "common-error", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 01f7c3de8d..e9e07bf522 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] common-base = { path = "../common/base" } common-time = { path = "../common/time" } +common-error = { path = "../common/error" } datatypes = { path = "../datatypes" } prost = "0.11" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/api/greptime/v1/admin.proto b/src/api/greptime/v1/admin.proto index 842e885c72..5dcd021d2f 100644 --- a/src/api/greptime/v1/admin.proto +++ b/src/api/greptime/v1/admin.proto @@ -19,6 +19,7 @@ message AdminExpr { oneof expr { CreateExpr create = 2; AlterExpr alter = 3; + CreateDatabaseExpr create_database = 4; } } @@ -29,6 +30,7 @@ message AdminResult { } } +// TODO(hl): rename to CreateTableExpr message CreateExpr { optional string catalog_name = 1; optional string schema_name = 2; @@ -53,3 +55,8 @@ message AlterExpr { message AddColumn { ColumnDef column_def = 1; } + +message CreateDatabaseExpr { + //TODO(hl): maybe rename to schema_name? + string database_name = 1; +} diff --git a/src/api/src/error.rs b/src/api/src/error.rs index 9b97352626..6fc0eaf141 100644 --- a/src/api/src/error.rs +++ b/src/api/src/error.rs @@ -1,6 +1,10 @@ +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::prelude::StatusCode; use datatypes::prelude::ConcreteDataType; use snafu::prelude::*; -use snafu::Backtrace; +use snafu::{Backtrace, ErrorCompat}; pub type Result = std::result::Result; @@ -16,3 +20,19 @@ pub enum Error { backtrace: Backtrace, }, } + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments, + Error::IntoColumnDataType { .. } => StatusCode::Unexpected, + } + } + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs index 30be0bc9a5..a73d546d85 100644 --- a/src/client/src/admin.rs +++ b/src/client/src/admin.rs @@ -70,6 +70,17 @@ impl Admin { ); Ok(results) } + + pub async fn create_database(&self, expr: CreateDatabaseExpr) -> Result { + let header = ExprHeader { + version: PROTOCOL_VERSION, + }; + let expr = AdminExpr { + header: Some(header), + expr: Some(admin_expr::Expr::CreateDatabase(expr)), + }; + Ok(self.do_requests(vec![expr]).await?.remove(0)) + } } pub fn admin_result_to_output(admin_result: AdminResult) -> Result { diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 4b99482542..43280a895a 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -1,6 +1,7 @@ use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{Datanode, DatanodeOptions, Mode}; +use datanode::datanode::{Datanode, DatanodeOptions}; +use frontend::frontend::Mode; use snafu::ResultExt; use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu}; @@ -120,6 +121,7 @@ impl TryFrom for DatanodeOptions { #[cfg(test)] mod tests { use datanode::datanode::ObjectStoreConfig; + use frontend::frontend::Mode; use super::*; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index bd1036ffba..76e37d4295 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -56,8 +56,13 @@ pub struct StartCommand { impl StartCommand { async fn run(self) -> Result<()> { - let opts = self.try_into()?; - let mut frontend = Frontend::new(opts, Instance::new()); + let opts: FrontendOptions = self.try_into()?; + let mut frontend = Frontend::new( + opts.clone(), + Instance::try_new(&opts) + .await + .context(error::StartFrontendSnafu)?, + ); frontend.start().await.context(error::StartFrontendSnafu) } } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 0d322df754..a97c11a715 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -51,6 +51,7 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } +frontend = { path = "../frontend" } [dependencies.arrow] package = "arrow2" diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index ce9256584b..466a0102f1 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -1,5 +1,8 @@ use std::sync::Arc; +use common_telemetry::info; +use frontend::frontend::Mode; +use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; use crate::error::Result; @@ -20,13 +23,6 @@ impl Default for ObjectStoreConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum Mode { - Standalone, - Distributed, -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DatanodeOptions { pub node_id: u64, @@ -72,7 +68,7 @@ pub struct Datanode { impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { let instance = Arc::new(Instance::new(&opts).await?); - let services = Services::try_new(instance.clone(), &opts)?; + let services = Services::try_new(instance.clone(), &opts).await?; Ok(Self { opts, services, @@ -81,27 +77,9 @@ impl Datanode { } pub async fn start(&mut self) -> Result<()> { + info!("Starting datanode instance..."); self.instance.start().await?; - self.services.start(&self.opts).await - } -} - -// Options for meta client in datanode instance. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MetaClientOpts { - pub metasrv_addr: String, - pub timeout_millis: u64, - pub connect_timeout_millis: u64, - pub tcp_nodelay: bool, -} - -impl Default for MetaClientOpts { - fn default() -> Self { - Self { - metasrv_addr: "127.0.0.1:3002".to_string(), - timeout_millis: 3_000u64, - connect_timeout_millis: 5_000u64, - tcp_nodelay: true, - } + self.services.start(&self.opts).await?; + Ok(()) } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 1bd0553a21..2f7e7c40a9 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -283,6 +283,18 @@ pub enum Error { #[snafu(display("Insert batch is empty"))] EmptyInsertBatch, + + #[snafu(display("Failed to build frontend instance, source: {}", source))] + BuildFrontend { + #[snafu(backtrace)] + source: frontend::error::Error, + }, + + #[snafu(display("Failed to start frontend instance, source: {}", source))] + StartFrontend { + #[snafu(backtrace)] + source: frontend::error::Error, + }, } pub type Result = std::result::Result; @@ -351,6 +363,9 @@ impl ErrorExt for Error { Error::MetaClientInit { source, .. } => source.status_code(), Error::InsertData { source, .. } => source.status_code(), Error::EmptyInsertBatch => StatusCode::InvalidArguments, + Error::BuildFrontend { source, .. } | Error::StartFrontend { source, .. } => { + source.status_code() + } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 139415e403..5cbe41404d 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -5,8 +5,10 @@ use catalog::remote::MetaKvBackend; use catalog::CatalogManagerRef; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::logging::info; +use frontend::frontend::Mode; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use meta_client::client::{MetaClient, MetaClientBuilder}; +use meta_client::MetaClientOpts; use object_store::{services::fs::Builder, util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use snafu::prelude::*; @@ -14,7 +16,7 @@ use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; -use crate::datanode::{DatanodeOptions, MetaClientOpts, Mode, ObjectStoreConfig}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{self, CatalogSnafu, MetaClientInitSnafu, NewCatalogSnafu, Result}; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 2e4e33567d..e44345aa9e 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -3,10 +3,11 @@ use std::ops::Deref; use api::v1::codec::RegionNumber; use api::v1::{ admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, - ObjectExpr, ObjectResult, SelectExpr, + CreateDatabaseExpr, ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_insert::insertion_expr_to_request; use common_query::Output; @@ -15,7 +16,7 @@ use query::plan::LogicalPlan; use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler}; use snafu::prelude::*; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use table::requests::AddColumnRequest; +use table::requests::{AddColumnRequest, CreateDatabaseRequest}; use crate::error::{ CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, @@ -23,7 +24,7 @@ use crate::error::{ UnsupportedExprSnafu, }; use crate::instance::Instance; -use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; +use crate::server::grpc::handler::{build_err_result, AdminResultBuilder, ObjectResultBuilder}; use crate::server::grpc::plan::PhysicalPlanner; use crate::server::grpc::select::to_object_result; use crate::sql::SqlRequest; @@ -203,6 +204,27 @@ impl Instance { } } + async fn execute_create_database( + &self, + create_database_expr: CreateDatabaseExpr, + ) -> AdminResult { + let req = CreateDatabaseRequest { + db_name: create_database_expr.database_name, + }; + let result = self.sql_handler.create_database(req).await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + async fn execute_logical(&self, plan_bytes: Vec) -> Result { let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone()); let logical_plan = logical_plan_converter @@ -271,6 +293,9 @@ impl GrpcAdminHandler for Instance { let admin_resp = match expr.expr { Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await, Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await, + Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => { + self.execute_create_database(create_database_expr).await + } other => { return servers::error::NotSupportedSnafu { feat: format!("{:?}", other), diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 9e17a6ebb8..4ab6a7ff32 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -3,84 +3,86 @@ pub mod grpc; use std::net::SocketAddr; use std::sync::Arc; +use common_error::prelude::BoxedError; use common_runtime::Builder as RuntimeBuilder; +use common_telemetry::info; +use frontend::frontend::{Frontend, FrontendOptions, Mode}; +use frontend::instance::Instance as FrontendInstanceImpl; use servers::grpc::GrpcServer; -use servers::http::HttpServer; -use servers::mysql::server::MysqlServer; -use servers::postgres::PostgresServer; use servers::server::Server; use snafu::ResultExt; use tokio::try_join; use crate::datanode::DatanodeOptions; -use crate::error::{self, Result}; +use crate::error::{ + BuildFrontendSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu, +}; use crate::instance::InstanceRef; /// All rpc services. pub struct Services { - http_server: HttpServer, grpc_server: GrpcServer, - mysql_server: Box, - postgres_server: Box, + frontend: Option>, } impl Services { - pub fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result { - let mysql_io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.mysql_runtime_size as usize) - .thread_name("mysql-io-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, - ); - let postgres_io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.postgres_runtime_size as usize) - .thread_name("postgres-io-handlers") - .build() - .context(error::RuntimeResourceSnafu)?, - ); + pub async fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result { let grpc_runtime = Arc::new( RuntimeBuilder::default() .worker_threads(opts.rpc_runtime_size as usize) .thread_name("grpc-io-handlers") .build() - .context(error::RuntimeResourceSnafu)?, + .context(RuntimeResourceSnafu)?, ); + + let frontend = match opts.mode { + Mode::Standalone => Some(Self::build_frontend(opts).await?), + Mode::Distributed => { + info!("Starting datanode in distributed mode, only gRPC server will be started."); + None + } + }; Ok(Self { - http_server: HttpServer::new(instance.clone()), grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime), - mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime), - postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)), + frontend, }) } - // TODO(LFC): make servers started on demand (not starting mysql if no needed, for example) - pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> { - let http_addr: SocketAddr = opts.http_addr.parse().context(error::ParseAddrSnafu { - addr: &opts.http_addr, - })?; + /// Build frontend instance in standalone mode + async fn build_frontend(opts: &DatanodeOptions) -> Result> { + let grpc_server_addr = &opts.rpc_addr; + info!( + "Build frontend with datanode gRPC addr: {}", + grpc_server_addr + ); + let options = FrontendOptions { + mode: Mode::Standalone, + datanode_rpc_addr: grpc_server_addr.clone(), + ..Default::default() + }; + let frontend_instance = FrontendInstanceImpl::try_new(&options) + .await + .context(BuildFrontendSnafu)?; + Ok(Frontend::new(options, frontend_instance)) + } - let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(error::ParseAddrSnafu { + pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> { + let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &opts.rpc_addr, })?; - let mysql_addr: SocketAddr = opts.mysql_addr.parse().context(error::ParseAddrSnafu { - addr: &opts.mysql_addr, - })?; - - let postgres_addr: SocketAddr = - opts.postgres_addr.parse().context(error::ParseAddrSnafu { - addr: &opts.postgres_addr, - })?; - - try_join!( - self.http_server.start(http_addr), - self.grpc_server.start(grpc_addr), - self.mysql_server.start(mysql_addr), - self.postgres_server.start(postgres_addr), - ) - .context(error::StartServerSnafu)?; + try_join!(self.grpc_server.start(grpc_addr), async { + if let Some(ref mut frontend_instance) = self.frontend { + info!("Starting frontend instance"); + frontend_instance + .start() + .await + .map_err(BoxedError::new) + .context(servers::error::StartFrontendSnafu)?; + } + Ok(()) + }) + .context(StartServerSnafu)?; Ok(()) } } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index da4df1e40e..63d98a40dc 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -12,9 +12,10 @@ impl SqlHandler { pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result { let ctx = EngineContext {}; let table_name = &req.table_name.clone(); - if !self.table_engine.table_exists(&ctx, table_name) { - return error::TableNotFoundSnafu { table_name }.fail(); - } + ensure!( + self.table_engine.table_exists(&ctx, table_name), + error::TableNotFoundSnafu { table_name } + ); self.table_engine .alter_table(&ctx, req) .await diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index 50d5880339..1b17346869 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -30,11 +30,11 @@ impl TryFrom<&[u8]> for ColumnDefaultConstraint { } } -impl TryInto> for ColumnDefaultConstraint { +impl TryFrom for Vec { type Error = error::Error; - fn try_into(self) -> Result> { - let s = serde_json::to_string(&self).context(error::SerializeSnafu)?; + fn try_from(value: ColumnDefaultConstraint) -> std::result::Result { + let s = serde_json::to_string(&value).context(error::SerializeSnafu)?; Ok(s.into_bytes()) } } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 401adf4454..57eaf38cbc 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,7 +21,7 @@ common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } -datanode = { path = "../datanode" } + datatypes = { path = "../datatypes" } futures = "0.3" itertools = "0.10" @@ -46,6 +46,7 @@ version = "0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dev-dependencies] +datanode = { path = "../datanode" } chrono = "0.4" futures = "0.3" meta-srv = { path = "../meta-srv", features = ["mock"] } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 3f7666bbb3..4ee927ef88 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use datanode::datanode::Mode; use serde::{Deserialize, Serialize}; use snafu::prelude::*; @@ -24,6 +23,7 @@ pub struct FrontendOptions { pub influxdb_options: Option, pub prometheus_options: Option, pub mode: Mode, + pub datanode_rpc_addr: String, } impl Default for FrontendOptions { @@ -37,14 +37,14 @@ impl Default for FrontendOptions { influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), mode: Mode::Standalone, + datanode_rpc_addr: "127.0.0.1:3001".to_string(), } } } impl FrontendOptions { - // TODO(LFC) Get Datanode address from Meta. pub(crate) fn datanode_grpc_addr(&self) -> String { - "127.0.0.1:3001".to_string() + self.datanode_rpc_addr.clone() } } @@ -74,9 +74,16 @@ where .context(error::IllegalFrontendStateSnafu { err_msg: "Frontend instance not initialized", })?; - instance.start(&self.opts).await?; + instance.start().await?; let instance = Arc::new(instance); Services::start(&self.opts, instance).await } } + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum Mode { + Standalone, + Distributed, +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 203f5c0ca4..a4205e89d0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -8,8 +8,8 @@ use std::time::Duration; use api::helper::ColumnDataTypeWrapper; use api::v1::{ - insert_expr, AdminExpr, AdminResult, ColumnDataType, ColumnDef as GrpcColumnDef, CreateExpr, - InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, + insert_expr, AdminExpr, AdminResult, AlterExpr, ColumnDataType, ColumnDef as GrpcColumnDef, + CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; @@ -18,9 +18,9 @@ use client::{Client, Database, Select}; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; -use datanode::datanode::{MetaClientOpts, Mode}; use datatypes::schema::ColumnSchema; use meta_client::client::MetaClientBuilder; +use meta_client::MetaClientOpts; use servers::error as server_error; use servers::query_handler::{ GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, @@ -36,7 +36,7 @@ use sql::{dialect::GenericDialect, parser::ParserContext}; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; -use crate::frontend::FrontendOptions; +use crate::frontend::{FrontendOptions, Mode}; use crate::table::route::TableRoutes; #[async_trait] @@ -51,36 +51,25 @@ pub trait FrontendInstance: + Sync + 'static { - async fn start(&mut self, opts: &FrontendOptions) -> Result<()>; + async fn start(&mut self) -> Result<()>; } +pub type FrontendInstanceRef = Arc; + #[derive(Default)] pub struct Instance { + // TODO(hl): In standalone mode, there is only one client. + // But in distribute mode, frontend should fetch datanodes' addresses from metasrv. client: Client, + /// catalog manager is None in standalone mode, datanode will keep their own catalog_manager: Option, } impl Instance { - pub fn new() -> Self { - Default::default() - } - - // TODO(fys): temporarily hard code - pub fn database(&self) -> Database { - Database::new("greptime", self.client.clone()) - } - - // TODO(fys): temporarily hard code - pub fn admin(&self) -> Admin { - Admin::new("greptime", self.client.clone()) - } -} - -#[async_trait] -impl FrontendInstance for Instance { - async fn start(&mut self, opts: &FrontendOptions) -> Result<()> { + pub async fn try_new(opts: &FrontendOptions) -> Result { + let mut instance = Instance::default(); let addr = opts.datanode_grpc_addr(); - self.client.start(vec![addr]); + instance.client.start(vec![addr]); let meta_client = match opts.mode { Mode::Standalone => None, @@ -102,7 +91,7 @@ impl FrontendInstance for Instance { } }; - self.catalog_manager = if let Some(meta_client) = meta_client { + instance.catalog_manager = if let Some(meta_client) = meta_client { let meta_backend = Arc::new(MetaKvBackend { client: meta_client.clone(), }); @@ -114,6 +103,24 @@ impl FrontendInstance for Instance { } else { None }; + Ok(instance) + } + + // TODO(fys): temporarily hard code + pub fn database(&self) -> Database { + Database::new("greptime", self.client.clone()) + } + + // TODO(fys): temporarily hard code + pub fn admin(&self) -> Admin { + Admin::new("greptime", self.client.clone()) + } +} + +#[async_trait] +impl FrontendInstance for Instance { + async fn start(&mut self) -> Result<()> { + // TODO(hl): Frontend init should move to here Ok(()) } } @@ -179,9 +186,34 @@ impl SqlQueryHandler for Instance { .await .and_then(admin_result_to_output) } - // TODO(LFC): Support other SQL execution, - // update, delete, alter, explain, etc. - _ => return server_error::NotSupportedSnafu { feat: query }.fail(), + + Statement::ShowDatabases(_) | Statement::ShowTables(_) => self + .database() + .select(Select::Sql(query.to_string())) + .await + .and_then(|object_result| object_result.try_into()), + + Statement::CreateDatabase(c) => { + let expr = CreateDatabaseExpr { + database_name: c.name.to_string(), + }; + self.admin() + .create_database(expr) + .await + .and_then(admin_result_to_output) + } + Statement::Alter(alter_stmt) => self + .admin() + .alter( + AlterExpr::try_from(alter_stmt) + .map_err(BoxedError::new) + .context(server_error::ExecuteAlterSnafu { query })?, + ) + .await + .and_then(admin_result_to_output), + Statement::ShowCreateTable(_) => { + return server_error::NotSupportedSnafu { feat: query }.fail() + } } .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }) diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 72a0eb31de..29c382bc15 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use std::sync::Arc; use common_runtime::Builder as RuntimeBuilder; +use common_telemetry::info; use servers::grpc::GrpcServer; use servers::http::HttpServer; use servers::mysql::server::MysqlServer; @@ -24,6 +25,7 @@ impl Services { where T: FrontendInstance, { + info!("Starting frontend servers"); let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options { let grpc_addr = parse_addr(&opts.addr)?; @@ -143,7 +145,9 @@ async fn start_server( server_and_addr: Option<(Box, SocketAddr)>, ) -> servers::error::Result> { if let Some((server, addr)) = server_and_addr { - server.start(addr).await.map(Some) + let res = server.start(addr).await.map(Some)?; + info!("Starting server at {}", addr); + Ok(res) } else { Ok(None) } diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 0d07295c41..e02c5b9216 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -1,5 +1,27 @@ +use serde::{Deserialize, Serialize}; + pub mod client; pub mod error; #[cfg(test)] mod mocks; pub mod rpc; + +// Options for meta client in datanode instance. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MetaClientOpts { + pub metasrv_addr: String, + pub timeout_millis: u64, + pub connect_timeout_millis: u64, + pub tcp_nodelay: bool, +} + +impl Default for MetaClientOpts { + fn default() -> Self { + Self { + metasrv_addr: "127.0.0.1:3002".to_string(), + timeout_millis: 3_000u64, + connect_timeout_millis: 5_000u64, + tcp_nodelay: true, + } + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8963c6ae7c..2702232617 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -68,6 +68,13 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to execute alter: {}, source: {}", query, source))] + ExecuteAlter { + query: String, + #[snafu(backtrace)] + source: BoxedError, + }, + #[snafu(display("Failed to insert script with name: {}, source: {}", name, source))] InsertScript { name: String, @@ -161,6 +168,12 @@ pub enum Error { source: tonic_reflection::server::Error, backtrace: Backtrace, }, + + #[snafu(display("Failed to start frontend service, source: {}", source))] + StartFrontend { + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -185,6 +198,7 @@ impl ErrorExt for Error { | ExecuteScript { source, .. } | ExecuteQuery { source, .. } | ExecuteInsert { source, .. } + | ExecuteAlter { source, .. } | PutOpentsdbDataPoint { source, .. } => source.status_code(), NotSupported { .. } @@ -201,6 +215,7 @@ impl ErrorExt for Error { InfluxdbLinesWrite { source, .. } => source.status_code(), Hyper { .. } => StatusCode::Unknown, + StartFrontend { source, .. } => source.status_code(), } } diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 5ec1e2912e..c2a1697a86 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +api = { path = "../api" } catalog = { path = "../catalog" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index e589f4dfa3..26c5dd741c 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -92,6 +92,24 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Unsupported ALTER TABLE statement: {}", msg))] + UnsupportedAlterTableStatement { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to serialize column default constraint, source: {}", source))] + SerializeColumnDefaultConstraint { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display( + "Failed to convert data type to gRPC data type defined in proto, source: {}", + source + ))] + ConvertToGrpcDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, } impl ErrorExt for Error { @@ -112,6 +130,9 @@ impl ErrorExt for Error { InvalidDatabaseName { .. } | ColumnTypeMismatch { .. } | InvalidTableName { .. } => { StatusCode::InvalidArguments } + UnsupportedAlterTableStatement { .. } => StatusCode::InvalidSyntax, + SerializeColumnDefaultConstraint { source, .. } => source.status_code(), + ConvertToGrpcDataType { source, .. } => source.status_code(), } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 0f60898ab8..0adf21d395 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -7,6 +7,7 @@ pub mod statement; use std::str::FromStr; +use api::helper::ColumnDataTypeWrapper; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; @@ -20,7 +21,8 @@ use crate::ast::{ Value as SqlValue, }; use crate::error::{ - self, ColumnTypeMismatchSnafu, ParseSqlValueSnafu, Result, UnsupportedDefaultValueSnafu, + self, ColumnTypeMismatchSnafu, ConvertToGrpcDataTypeSnafu, ParseSqlValueSnafu, Result, + SerializeColumnDefaultConstraintSnafu, UnsupportedDefaultValueSnafu, }; /// Converts maybe fully-qualified table name (`..` or `
` when @@ -239,6 +241,31 @@ pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Resu }) } +/// Convert `ColumnDef` in sqlparser to `ColumnDef` in gRPC proto. +fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result { + let name = col.name.value.clone(); + let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?; + let nullable = col + .options + .iter() + .any(|o| matches!(o.option, ColumnOption::Null)); + + let default_constraint = parse_column_default_constraint(&name, &data_type, &col.options)? + .map(ColumnDefaultConstraint::try_into) // serialize default constraint to bytes + .transpose() + .context(SerializeColumnDefaultConstraintSnafu)?; + + let data_type = ColumnDataTypeWrapper::try_from(data_type) + .context(ConvertToGrpcDataTypeSnafu)? + .datatype() as i32; + Ok(api::v1::ColumnDef { + name, + datatype: data_type, + is_nullable: nullable, + default_constraint, + }) +} + pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { match data_type { SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()), diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs index cdd2ea7ea8..ad2a124780 100644 --- a/src/sql/src/statements/alter.rs +++ b/src/sql/src/statements/alter.rs @@ -1,5 +1,9 @@ +use api::v1::{alter_expr, AlterExpr}; use sqlparser::ast::{ColumnDef, ObjectName, TableConstraint}; +use crate::error::UnsupportedAlterTableStatementSnafu; +use crate::statements::{sql_column_def_to_grpc_column_def, table_idents_to_full_name}; + #[derive(Debug, Clone, PartialEq, Eq)] pub struct AlterTable { table_name: ObjectName, @@ -29,4 +33,36 @@ pub enum AlterTableOperation { AddConstraint(TableConstraint), /// `ADD [ COLUMN ] ` AddColumn { column_def: ColumnDef }, + // TODO(hl): support remove column +} + +/// Convert `AlterTable` statement to `AlterExpr` for gRPC +impl TryFrom for AlterExpr { + type Error = crate::error::Error; + + fn try_from(value: AlterTable) -> Result { + let (catalog, schema, table) = table_idents_to_full_name(&value.table_name)?; + + let kind = match value.alter_operation { + AlterTableOperation::AddConstraint(_) => { + return UnsupportedAlterTableStatementSnafu { + msg: "ADD CONSTRAINT not supported yet.", + } + .fail(); + } + AlterTableOperation::AddColumn { column_def } => { + alter_expr::Kind::AddColumn(api::v1::AddColumn { + column_def: Some(sql_column_def_to_grpc_column_def(column_def)?), + }) + } + }; + let expr = AlterExpr { + catalog_name: Some(catalog), + schema_name: Some(schema), + table_name: table, + kind: Some(kind), + }; + + Ok(expr) + } }