refactor: datanode starts frontend (#471)

* refactor: dependency, from frontend depends on datanode to datanode depends on frontend

* wip: start frontend in datanode

* wip: migrate create database to frontend

* wip: impl alter table

* fix: CR comments
This commit is contained in:
Lei, Huang
2022-11-12 21:07:18 +08:00
committed by GitHub
parent 0d4c191a06
commit 2d869e1e43
25 changed files with 365 additions and 126 deletions

3
Cargo.lock generated
View File

@@ -119,6 +119,7 @@ name = "api"
version = "0.1.0"
dependencies = [
"common-base",
"common-error",
"common-time",
"datatypes",
"prost 0.11.0",
@@ -1727,6 +1728,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datatypes",
"frontend",
"futures",
"hyper",
"log-store",
@@ -5449,6 +5451,7 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
name = "sql"
version = "0.1.0"
dependencies = [
"api",
"catalog",
"common-catalog",
"common-error",

View File

@@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
common-base = { path = "../common/base" }
common-time = { path = "../common/time" }
common-error = { path = "../common/error" }
datatypes = { path = "../datatypes" }
prost = "0.11"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -19,6 +19,7 @@ message AdminExpr {
oneof expr {
CreateExpr create = 2;
AlterExpr alter = 3;
CreateDatabaseExpr create_database = 4;
}
}
@@ -29,6 +30,7 @@ message AdminResult {
}
}
// TODO(hl): rename to CreateTableExpr
message CreateExpr {
optional string catalog_name = 1;
optional string schema_name = 2;
@@ -53,3 +55,8 @@ message AlterExpr {
message AddColumn {
ColumnDef column_def = 1;
}
message CreateDatabaseExpr {
//TODO(hl): maybe rename to schema_name?
string database_name = 1;
}

View File

@@ -1,6 +1,10 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::prelude::StatusCode;
use datatypes::prelude::ConcreteDataType;
use snafu::prelude::*;
use snafu::Backtrace;
use snafu::{Backtrace, ErrorCompat};
pub type Result<T> = std::result::Result<T, Error>;
@@ -16,3 +20,19 @@ pub enum Error {
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments,
Error::IntoColumnDataType { .. } => StatusCode::Unexpected,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -70,6 +70,17 @@ impl Admin {
);
Ok(results)
}
pub async fn create_database(&self, expr: CreateDatabaseExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::CreateDatabase(expr)),
};
Ok(self.do_requests(vec![expr]).await?.remove(0))
}
}
pub fn admin_result_to_output(admin_result: AdminResult) -> Result<Output> {

View File

@@ -1,6 +1,7 @@
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions, Mode};
use datanode::datanode::{Datanode, DatanodeOptions};
use frontend::frontend::Mode;
use snafu::ResultExt;
use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
@@ -120,6 +121,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
#[cfg(test)]
mod tests {
use datanode::datanode::ObjectStoreConfig;
use frontend::frontend::Mode;
use super::*;

View File

@@ -56,8 +56,13 @@ pub struct StartCommand {
impl StartCommand {
async fn run(self) -> Result<()> {
let opts = self.try_into()?;
let mut frontend = Frontend::new(opts, Instance::new());
let opts: FrontendOptions = self.try_into()?;
let mut frontend = Frontend::new(
opts.clone(),
Instance::try_new(&opts)
.await
.context(error::StartFrontendSnafu)?,
);
frontend.start().await.context(error::StartFrontendSnafu)
}
}

View File

@@ -51,6 +51,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
frontend = { path = "../frontend" }
[dependencies.arrow]
package = "arrow2"

View File

@@ -1,5 +1,8 @@
use std::sync::Arc;
use common_telemetry::info;
use frontend::frontend::Mode;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -20,13 +23,6 @@ impl Default for ObjectStoreConfig {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatanodeOptions {
pub node_id: u64,
@@ -72,7 +68,7 @@ pub struct Datanode {
impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::new(&opts).await?);
let services = Services::try_new(instance.clone(), &opts)?;
let services = Services::try_new(instance.clone(), &opts).await?;
Ok(Self {
opts,
services,
@@ -81,27 +77,9 @@ impl Datanode {
}
pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");
self.instance.start().await?;
self.services.start(&self.opts).await
}
}
// Options for meta client in datanode instance.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MetaClientOpts {
pub metasrv_addr: String,
pub timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
}
impl Default for MetaClientOpts {
fn default() -> Self {
Self {
metasrv_addr: "127.0.0.1:3002".to_string(),
timeout_millis: 3_000u64,
connect_timeout_millis: 5_000u64,
tcp_nodelay: true,
}
self.services.start(&self.opts).await?;
Ok(())
}
}

View File

@@ -283,6 +283,18 @@ pub enum Error {
#[snafu(display("Insert batch is empty"))]
EmptyInsertBatch,
#[snafu(display("Failed to build frontend instance, source: {}", source))]
BuildFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},
#[snafu(display("Failed to start frontend instance, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
source: frontend::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -351,6 +363,9 @@ impl ErrorExt for Error {
Error::MetaClientInit { source, .. } => source.status_code(),
Error::InsertData { source, .. } => source.status_code(),
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
Error::BuildFrontend { source, .. } | Error::StartFrontend { source, .. } => {
source.status_code()
}
}
}

View File

@@ -5,8 +5,10 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use frontend::frontend::Mode;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use object_store::{services::fs::Builder, util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use snafu::prelude::*;
@@ -14,7 +16,7 @@ use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
use crate::datanode::{DatanodeOptions, MetaClientOpts, Mode, ObjectStoreConfig};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{self, CatalogSnafu, MetaClientInitSnafu, NewCatalogSnafu, Result};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;

View File

@@ -3,10 +3,11 @@ use std::ops::Deref;
use api::v1::codec::RegionNumber;
use api::v1::{
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
ObjectExpr, ObjectResult, SelectExpr,
CreateDatabaseExpr, ObjectExpr, ObjectResult, SelectExpr,
};
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_insert::insertion_expr_to_request;
use common_query::Output;
@@ -15,7 +16,7 @@ use query::plan::LogicalPlan;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
use snafu::prelude::*;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::AddColumnRequest;
use table::requests::{AddColumnRequest, CreateDatabaseRequest};
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu,
@@ -23,7 +24,7 @@ use crate::error::{
UnsupportedExprSnafu,
};
use crate::instance::Instance;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::handler::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::SqlRequest;
@@ -203,6 +204,27 @@ impl Instance {
}
}
async fn execute_create_database(
&self,
create_database_expr: CreateDatabaseExpr,
) -> AdminResult {
let req = CreateDatabaseRequest {
db_name: create_database_expr.database_name,
};
let result = self.sql_handler.create_database(req).await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
async fn execute_logical(&self, plan_bytes: Vec<u8>) -> Result<Output> {
let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone());
let logical_plan = logical_plan_converter
@@ -271,6 +293,9 @@ impl GrpcAdminHandler for Instance {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
self.execute_create_database(create_database_expr).await
}
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{:?}", other),

View File

@@ -3,84 +3,86 @@ pub mod grpc;
use std::net::SocketAddr;
use std::sync::Arc;
use common_error::prelude::BoxedError;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::info;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::instance::Instance as FrontendInstanceImpl;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::mysql::server::MysqlServer;
use servers::postgres::PostgresServer;
use servers::server::Server;
use snafu::ResultExt;
use tokio::try_join;
use crate::datanode::DatanodeOptions;
use crate::error::{self, Result};
use crate::error::{
BuildFrontendSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu,
};
use crate::instance::InstanceRef;
/// All rpc services.
pub struct Services {
http_server: HttpServer,
grpc_server: GrpcServer,
mysql_server: Box<dyn Server>,
postgres_server: Box<dyn Server>,
frontend: Option<Frontend<FrontendInstanceImpl>>,
}
impl Services {
pub fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result<Self> {
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let postgres_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.postgres_runtime_size as usize)
.thread_name("postgres-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
pub async fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result<Self> {
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.rpc_runtime_size as usize)
.thread_name("grpc-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
.context(RuntimeResourceSnafu)?,
);
let frontend = match opts.mode {
Mode::Standalone => Some(Self::build_frontend(opts).await?),
Mode::Distributed => {
info!("Starting datanode in distributed mode, only gRPC server will be started.");
None
}
};
Ok(Self {
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime),
postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)),
frontend,
})
}
// TODO(LFC): make servers started on demand (not starting mysql if no needed, for example)
pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let http_addr: SocketAddr = opts.http_addr.parse().context(error::ParseAddrSnafu {
addr: &opts.http_addr,
})?;
/// Build frontend instance in standalone mode
async fn build_frontend(opts: &DatanodeOptions) -> Result<Frontend<FrontendInstanceImpl>> {
let grpc_server_addr = &opts.rpc_addr;
info!(
"Build frontend with datanode gRPC addr: {}",
grpc_server_addr
);
let options = FrontendOptions {
mode: Mode::Standalone,
datanode_rpc_addr: grpc_server_addr.clone(),
..Default::default()
};
let frontend_instance = FrontendInstanceImpl::try_new(&options)
.await
.context(BuildFrontendSnafu)?;
Ok(Frontend::new(options, frontend_instance))
}
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(error::ParseAddrSnafu {
pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
let mysql_addr: SocketAddr = opts.mysql_addr.parse().context(error::ParseAddrSnafu {
addr: &opts.mysql_addr,
})?;
let postgres_addr: SocketAddr =
opts.postgres_addr.parse().context(error::ParseAddrSnafu {
addr: &opts.postgres_addr,
})?;
try_join!(
self.http_server.start(http_addr),
self.grpc_server.start(grpc_addr),
self.mysql_server.start(mysql_addr),
self.postgres_server.start(postgres_addr),
)
.context(error::StartServerSnafu)?;
try_join!(self.grpc_server.start(grpc_addr), async {
if let Some(ref mut frontend_instance) = self.frontend {
info!("Starting frontend instance");
frontend_instance
.start()
.await
.map_err(BoxedError::new)
.context(servers::error::StartFrontendSnafu)?;
}
Ok(())
})
.context(StartServerSnafu)?;
Ok(())
}
}

View File

@@ -12,9 +12,10 @@ impl SqlHandler {
pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result<Output> {
let ctx = EngineContext {};
let table_name = &req.table_name.clone();
if !self.table_engine.table_exists(&ctx, table_name) {
return error::TableNotFoundSnafu { table_name }.fail();
}
ensure!(
self.table_engine.table_exists(&ctx, table_name),
error::TableNotFoundSnafu { table_name }
);
self.table_engine
.alter_table(&ctx, req)
.await

View File

@@ -30,11 +30,11 @@ impl TryFrom<&[u8]> for ColumnDefaultConstraint {
}
}
impl TryInto<Vec<u8>> for ColumnDefaultConstraint {
impl TryFrom<ColumnDefaultConstraint> for Vec<u8> {
type Error = error::Error;
fn try_into(self) -> Result<Vec<u8>> {
let s = serde_json::to_string(&self).context(error::SerializeSnafu)?;
fn try_from(value: ColumnDefaultConstraint) -> std::result::Result<Self, Self::Error> {
let s = serde_json::to_string(&value).context(error::SerializeSnafu)?;
Ok(s.into_bytes())
}
}

View File

@@ -21,7 +21,7 @@ common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datanode = { path = "../datanode" }
datatypes = { path = "../datatypes" }
futures = "0.3"
itertools = "0.10"
@@ -46,6 +46,7 @@ version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dev-dependencies]
datanode = { path = "../datanode" }
chrono = "0.4"
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"] }

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use datanode::datanode::Mode;
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
@@ -24,6 +23,7 @@ pub struct FrontendOptions {
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub datanode_rpc_addr: String,
}
impl Default for FrontendOptions {
@@ -37,14 +37,14 @@ impl Default for FrontendOptions {
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
}
}
}
impl FrontendOptions {
// TODO(LFC) Get Datanode address from Meta.
pub(crate) fn datanode_grpc_addr(&self) -> String {
"127.0.0.1:3001".to_string()
self.datanode_rpc_addr.clone()
}
}
@@ -74,9 +74,16 @@ where
.context(error::IllegalFrontendStateSnafu {
err_msg: "Frontend instance not initialized",
})?;
instance.start(&self.opts).await?;
instance.start().await?;
let instance = Arc::new(instance);
Services::start(&self.opts, instance).await
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}

View File

@@ -8,8 +8,8 @@ use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
insert_expr, AdminExpr, AdminResult, ColumnDataType, ColumnDef as GrpcColumnDef, CreateExpr,
InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
insert_expr, AdminExpr, AdminResult, AlterExpr, ColumnDataType, ColumnDef as GrpcColumnDef,
CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
@@ -18,9 +18,9 @@ use client::{Client, Database, Select};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use datanode::datanode::{MetaClientOpts, Mode};
use datatypes::schema::ColumnSchema;
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientOpts;
use servers::error as server_error;
use servers::query_handler::{
GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
@@ -36,7 +36,7 @@ use sql::{dialect::GenericDialect, parser::ParserContext};
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::frontend::FrontendOptions;
use crate::frontend::{FrontendOptions, Mode};
use crate::table::route::TableRoutes;
#[async_trait]
@@ -51,36 +51,25 @@ pub trait FrontendInstance:
+ Sync
+ 'static
{
async fn start(&mut self, opts: &FrontendOptions) -> Result<()>;
async fn start(&mut self) -> Result<()>;
}
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
#[derive(Default)]
pub struct Instance {
// TODO(hl): In standalone mode, there is only one client.
// But in distribute mode, frontend should fetch datanodes' addresses from metasrv.
client: Client,
/// catalog manager is None in standalone mode, datanode will keep their own
catalog_manager: Option<FrontendCatalogManager>,
}
impl Instance {
pub fn new() -> Self {
Default::default()
}
// TODO(fys): temporarily hard code
pub fn database(&self) -> Database {
Database::new("greptime", self.client.clone())
}
// TODO(fys): temporarily hard code
pub fn admin(&self) -> Admin {
Admin::new("greptime", self.client.clone())
}
}
#[async_trait]
impl FrontendInstance for Instance {
async fn start(&mut self, opts: &FrontendOptions) -> Result<()> {
pub async fn try_new(opts: &FrontendOptions) -> Result<Self> {
let mut instance = Instance::default();
let addr = opts.datanode_grpc_addr();
self.client.start(vec![addr]);
instance.client.start(vec![addr]);
let meta_client = match opts.mode {
Mode::Standalone => None,
@@ -102,7 +91,7 @@ impl FrontendInstance for Instance {
}
};
self.catalog_manager = if let Some(meta_client) = meta_client {
instance.catalog_manager = if let Some(meta_client) = meta_client {
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
@@ -114,6 +103,24 @@ impl FrontendInstance for Instance {
} else {
None
};
Ok(instance)
}
// TODO(fys): temporarily hard code
pub fn database(&self) -> Database {
Database::new("greptime", self.client.clone())
}
// TODO(fys): temporarily hard code
pub fn admin(&self) -> Admin {
Admin::new("greptime", self.client.clone())
}
}
#[async_trait]
impl FrontendInstance for Instance {
async fn start(&mut self) -> Result<()> {
// TODO(hl): Frontend init should move to here
Ok(())
}
}
@@ -179,9 +186,34 @@ impl SqlQueryHandler for Instance {
.await
.and_then(admin_result_to_output)
}
// TODO(LFC): Support other SQL execution,
// update, delete, alter, explain, etc.
_ => return server_error::NotSupportedSnafu { feat: query }.fail(),
Statement::ShowDatabases(_) | Statement::ShowTables(_) => self
.database()
.select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into()),
Statement::CreateDatabase(c) => {
let expr = CreateDatabaseExpr {
database_name: c.name.to_string(),
};
self.admin()
.create_database(expr)
.await
.and_then(admin_result_to_output)
}
Statement::Alter(alter_stmt) => self
.admin()
.alter(
AlterExpr::try_from(alter_stmt)
.map_err(BoxedError::new)
.context(server_error::ExecuteAlterSnafu { query })?,
)
.await
.and_then(admin_result_to_output),
Statement::ShowCreateTable(_) => {
return server_error::NotSupportedSnafu { feat: query }.fail()
}
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })

View File

@@ -2,6 +2,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::info;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::mysql::server::MysqlServer;
@@ -24,6 +25,7 @@ impl Services {
where
T: FrontendInstance,
{
info!("Starting frontend servers");
let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options {
let grpc_addr = parse_addr(&opts.addr)?;
@@ -143,7 +145,9 @@ async fn start_server(
server_and_addr: Option<(Box<dyn Server>, SocketAddr)>,
) -> servers::error::Result<Option<SocketAddr>> {
if let Some((server, addr)) = server_and_addr {
server.start(addr).await.map(Some)
let res = server.start(addr).await.map(Some)?;
info!("Starting server at {}", addr);
Ok(res)
} else {
Ok(None)
}

View File

@@ -1,5 +1,27 @@
use serde::{Deserialize, Serialize};
pub mod client;
pub mod error;
#[cfg(test)]
mod mocks;
pub mod rpc;
// Options for meta client in datanode instance.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MetaClientOpts {
pub metasrv_addr: String,
pub timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
}
impl Default for MetaClientOpts {
fn default() -> Self {
Self {
metasrv_addr: "127.0.0.1:3002".to_string(),
timeout_millis: 3_000u64,
connect_timeout_millis: 5_000u64,
tcp_nodelay: true,
}
}
}

View File

@@ -68,6 +68,13 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to execute alter: {}, source: {}", query, source))]
ExecuteAlter {
query: String,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to insert script with name: {}, source: {}", name, source))]
InsertScript {
name: String,
@@ -161,6 +168,12 @@ pub enum Error {
source: tonic_reflection::server::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to start frontend service, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -185,6 +198,7 @@ impl ErrorExt for Error {
| ExecuteScript { source, .. }
| ExecuteQuery { source, .. }
| ExecuteInsert { source, .. }
| ExecuteAlter { source, .. }
| PutOpentsdbDataPoint { source, .. } => source.status_code(),
NotSupported { .. }
@@ -201,6 +215,7 @@ impl ErrorExt for Error {
InfluxdbLinesWrite { source, .. } => source.status_code(),
Hyper { .. } => StatusCode::Unknown,
StartFrontend { source, .. } => source.status_code(),
}
}

View File

@@ -5,6 +5,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
api = { path = "../api" }
catalog = { path = "../catalog" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }

View File

@@ -92,6 +92,24 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Unsupported ALTER TABLE statement: {}", msg))]
UnsupportedAlterTableStatement { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to serialize column default constraint, source: {}", source))]
SerializeColumnDefaultConstraint {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to convert data type to gRPC data type defined in proto, source: {}",
source
))]
ConvertToGrpcDataType {
#[snafu(backtrace)]
source: api::error::Error,
},
}
impl ErrorExt for Error {
@@ -112,6 +130,9 @@ impl ErrorExt for Error {
InvalidDatabaseName { .. } | ColumnTypeMismatch { .. } | InvalidTableName { .. } => {
StatusCode::InvalidArguments
}
UnsupportedAlterTableStatement { .. } => StatusCode::InvalidSyntax,
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
}
}

View File

@@ -7,6 +7,7 @@ pub mod statement;
use std::str::FromStr;
use api::helper::ColumnDataTypeWrapper;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
@@ -20,7 +21,8 @@ use crate::ast::{
Value as SqlValue,
};
use crate::error::{
self, ColumnTypeMismatchSnafu, ParseSqlValueSnafu, Result, UnsupportedDefaultValueSnafu,
self, ColumnTypeMismatchSnafu, ConvertToGrpcDataTypeSnafu, ParseSqlValueSnafu, Result,
SerializeColumnDefaultConstraintSnafu, UnsupportedDefaultValueSnafu,
};
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>` or `<table>` when
@@ -239,6 +241,31 @@ pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Resu
})
}
/// Convert `ColumnDef` in sqlparser to `ColumnDef` in gRPC proto.
fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::ColumnDef> {
let name = col.name.value.clone();
let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?;
let nullable = col
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::Null));
let default_constraint = parse_column_default_constraint(&name, &data_type, &col.options)?
.map(ColumnDefaultConstraint::try_into) // serialize default constraint to bytes
.transpose()
.context(SerializeColumnDefaultConstraintSnafu)?;
let data_type = ColumnDataTypeWrapper::try_from(data_type)
.context(ConvertToGrpcDataTypeSnafu)?
.datatype() as i32;
Ok(api::v1::ColumnDef {
name,
datatype: data_type,
is_nullable: nullable,
default_constraint,
})
}
pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<ConcreteDataType> {
match data_type {
SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()),

View File

@@ -1,5 +1,9 @@
use api::v1::{alter_expr, AlterExpr};
use sqlparser::ast::{ColumnDef, ObjectName, TableConstraint};
use crate::error::UnsupportedAlterTableStatementSnafu;
use crate::statements::{sql_column_def_to_grpc_column_def, table_idents_to_full_name};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AlterTable {
table_name: ObjectName,
@@ -29,4 +33,36 @@ pub enum AlterTableOperation {
AddConstraint(TableConstraint),
/// `ADD [ COLUMN ] <column_def>`
AddColumn { column_def: ColumnDef },
// TODO(hl): support remove column
}
/// Convert `AlterTable` statement to `AlterExpr` for gRPC
impl TryFrom<AlterTable> for AlterExpr {
type Error = crate::error::Error;
fn try_from(value: AlterTable) -> Result<Self, Self::Error> {
let (catalog, schema, table) = table_idents_to_full_name(&value.table_name)?;
let kind = match value.alter_operation {
AlterTableOperation::AddConstraint(_) => {
return UnsupportedAlterTableStatementSnafu {
msg: "ADD CONSTRAINT not supported yet.",
}
.fail();
}
AlterTableOperation::AddColumn { column_def } => {
alter_expr::Kind::AddColumn(api::v1::AddColumn {
column_def: Some(sql_column_def_to_grpc_column_def(column_def)?),
})
}
};
let expr = AlterExpr {
catalog_name: Some(catalog),
schema_name: Some(schema),
table_name: table,
kind: Some(kind),
};
Ok(expr)
}
}