diff --git a/Cargo.lock b/Cargo.lock index b1a8f2b8c3..5ea4f8e485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1179,10 +1179,13 @@ dependencies = [ "async-trait", "common-base", "common-error", + "common-query", + "common-recordbatch", "common-runtime", "criterion 0.4.0", "dashmap", "datafusion", + "datatypes", "rand 0.8.5", "snafu", "tokio", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index e0503f3d8d..df8e5a94fb 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,9 +1,9 @@ node_id = 42 mode = 'distributed' -rpc_addr = '0.0.0.0:3001' +rpc_addr = '127.0.0.1:3001' wal_dir = '/tmp/greptimedb/wal' rpc_runtime_size = 8 -mysql_addr = '0.0.0.0:3306' +mysql_addr = '127.0.0.1:3306' mysql_runtime_size = 4 [storage] diff --git a/config/frontend.example.toml b/config/frontend.example.toml index a9cb1e969f..87719e8b6d 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -1,6 +1,6 @@ mode = 'distributed' datanode_rpc_addr = '127.0.0.1:3001' -http_addr = '0.0.0.0:4000' +http_addr = '127.0.0.1:4000' [meta_client_opts] metasrv_addr = '1.1.1.1:3002' diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 00ca366dc0..5531e6e4a6 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -1,4 +1,4 @@ bind_addr = '127.0.0.1:3002' -server_addr = '0.0.0.0:3002' -store_addr = '127.0.0.1:2380' -datanode_lease_secs = 30 +server_addr = '127.0.0.1:3002' +store_addr = '127.0.0.1:2379' +datanode_lease_secs = 15 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 1c3d3f3fad..fa7ab87256 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -1,7 +1,7 @@ node_id = 0 mode = 'standalone' -http_addr = '0.0.0.0:4000' -datanode_mysql_addr = '0.0.0.0:3306' +http_addr = '127.0.0.1:4000' +datanode_mysql_addr = '127.0.0.1:3306' datanode_mysql_runtime_size = 4 wal_dir = '/tmp/greptimedb/wal/' @@ -10,18 +10,18 @@ type = 'File' data_dir = '/tmp/greptimedb/data/' [grpc_options] -addr = '0.0.0.0:4001' +addr = '127.0.0.1:4001' runtime_size = 8 [mysql_options] -addr = '0.0.0.0:4002' +addr = '127.0.0.1:4002' runtime_size = 2 [influxdb_options] enable = true [opentsdb_options] -addr = '0.0.0.0:4242' +addr = '127.0.0.1:4242' enable = true runtime_size = 2 @@ -29,6 +29,6 @@ runtime_size = 2 enable = true [postgres_options] -addr = '0.0.0.0:4003' +addr = '127.0.0.1:4003' runtime_size = 2 check_pwd = false diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 689edfe145..3415acd2ec 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::codec::SelectResult as GrpcSelectResult; +use api::v1::column::SemanticType; use api::v1::{ object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan, @@ -219,7 +220,12 @@ impl TryFrom for Output { .map(|(column, vector)| { let datatype = vector.data_type(); // nullable or not, does not affect the output - ColumnSchema::new(&column.column_name, datatype, true) + let mut column_schema = + ColumnSchema::new(&column.column_name, datatype, true); + if column.semantic_type == SemanticType::Timestamp as i32 { + column_schema = column_schema.with_time_index(true); + } + column_schema }) .collect::>(); @@ -251,7 +257,7 @@ impl TryFrom for Output { mod tests { use api::helper::ColumnDataTypeWrapper; use api::v1::Column; - use datanode::server::grpc::select::{null_mask, values}; + use common_grpc::select::{null_mask, values}; use datatypes::vectors::{ BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector, diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index c08d41539a..692c120b64 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -145,9 +145,9 @@ mod tests { )), }; let options: DatanodeOptions = cmd.try_into().unwrap(); - assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr); + assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir); - assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr); + assert_eq!("127.0.0.1:3306".to_string(), options.mysql_addr); assert_eq!(4, options.mysql_runtime_size); assert_eq!( "1.1.1.1:3002".to_string(), diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index a6cb988cc1..3d9018527d 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -104,13 +104,13 @@ mod tests { fn test_read_from_cmd() { let cmd = StartCommand { bind_addr: Some("127.0.0.1:3002".to_string()), - server_addr: Some("0.0.0.0:3002".to_string()), + server_addr: Some("127.0.0.1:3002".to_string()), store_addr: Some("127.0.0.1:2380".to_string()), config_file: None, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); - assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); + assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); } @@ -127,8 +127,8 @@ mod tests { }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); - assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); - assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); - assert_eq!(30, options.datanode_lease_secs); + assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); + assert_eq!("127.0.0.1:2379".to_string(), options.store_addr); + assert_eq!(15, options.datanode_lease_secs); } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 60c6fae6fe..c0152273b9 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -77,7 +77,7 @@ pub struct StandaloneOptions { impl Default for StandaloneOptions { fn default() -> Self { Self { - http_addr: Some("0.0.0.0:4000".to_string()), + http_addr: Some("127.0.0.1:4000".to_string()), grpc_options: Some(GrpcOptions::default()), mysql_options: Some(MysqlOptions::default()), postgres_options: Some(PostgresOptions::default()), @@ -87,7 +87,7 @@ impl Default for StandaloneOptions { mode: Mode::Standalone, wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), - datanode_mysql_addr: "0.0.0.0:3306".to_string(), + datanode_mysql_addr: "127.0.0.1:3306".to_string(), datanode_mysql_runtime_size: 4, } } @@ -274,12 +274,15 @@ mod tests { let fe_opts = FrontendOptions::try_from(cmd).unwrap(); assert_eq!(Mode::Standalone, fe_opts.mode); assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr); - assert_eq!(Some("0.0.0.0:4000".to_string()), fe_opts.http_addr); + assert_eq!(Some("127.0.0.1:4000".to_string()), fe_opts.http_addr); assert_eq!( - "0.0.0.0:4001".to_string(), + "127.0.0.1:4001".to_string(), fe_opts.grpc_options.unwrap().addr ); - assert_eq!("0.0.0.0:4002", fe_opts.mysql_options.as_ref().unwrap().addr); + assert_eq!( + "127.0.0.1:4002", + fe_opts.mysql_options.as_ref().unwrap().addr + ); assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size); assert!(fe_opts.influxdb_options.as_ref().unwrap().enable); } diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 0e1c8283d4..eef5357a3f 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -9,7 +9,10 @@ api = { path = "../../api" } async-trait = "0.1" common-base = { path = "../base" } common-error = { path = "../error" } +common-query = { path = "../query" } +common-recordbatch = { path = "../recordbatch" } common-runtime = { path = "../runtime" } +datatypes = { path = "../../datatypes" } dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ "simd", diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 9ec77d7355..bfc326e597 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -69,6 +69,21 @@ pub enum Error { source: tonic::transport::Error, backtrace: Backtrace, }, + + #[snafu(display("Failed to collect RecordBatches, source: {}", source))] + CollectRecordBatches { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to convert Arrow type: {}", from))] + Conversion { from: String, backtrace: Backtrace }, + + #[snafu(display("Column datatype error, source: {}", source))] + ColumnDataType { + #[snafu(backtrace)] + source: api::error::Error, + }, } impl ErrorExt for Error { @@ -83,7 +98,10 @@ impl ErrorExt for Error { } Error::NewProjection { .. } | Error::DecodePhysicalPlanNode { .. } - | Error::CreateChannel { .. } => StatusCode::Internal, + | Error::CreateChannel { .. } + | Error::Conversion { .. } => StatusCode::Internal, + Error::CollectRecordBatches { source } => source.status_code(), + Error::ColumnDataType { source } => source.status_code(), } } diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index f489a9ee93..8444de9086 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -15,6 +15,7 @@ pub mod channel_manager; pub mod error; pub mod physical; +pub mod select; pub mod writer; pub use error::Error; diff --git a/src/datanode/src/server/grpc/select.rs b/src/common/grpc/src/select.rs similarity index 98% rename from src/datanode/src/server/grpc/select.rs rename to src/common/grpc/src/select.rs index 769cb62f08..58a3f10bfa 100644 --- a/src/datanode/src/server/grpc/select.rs +++ b/src/common/grpc/src/select.rs @@ -21,6 +21,7 @@ use api::v1::column::{SemanticType, Values}; use api::v1::{Column, ObjectResult}; use arrow::array::{Array, BooleanArray, PrimitiveArray}; use common_base::BitVec; +use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream}; @@ -30,7 +31,7 @@ use snafu::{OptionExt, ResultExt}; use crate::error::{self, ConversionSnafu, Result}; -pub async fn to_object_result(output: Result) -> ObjectResult { +pub async fn to_object_result(output: std::result::Result) -> ObjectResult { let result = match output { Ok(Output::AffectedRows(rows)) => Ok(ObjectResultBuilder::new() .status_code(StatusCode::Success as u32) @@ -208,7 +209,7 @@ mod tests { use datatypes::schema::Schema; use datatypes::vectors::{UInt32Vector, VectorRef}; - use crate::server::grpc::select::{null_mask, try_convert, values}; + use crate::select::{null_mask, try_convert, values}; #[test] fn test_convert_record_batches_to_select_result() { diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index e90df0c300..90a127a64c 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -114,6 +114,12 @@ pub enum InnerError { #[snafu(backtrace)] source: DataTypeError, }, + + #[snafu(display("Failed to execute physical plan, source: {}", source))] + ExecutePhysicalPlan { + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -141,6 +147,7 @@ impl ErrorExt for InnerError { InnerError::UnsupportedInputDataType { .. } => StatusCode::InvalidArguments, InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(), + InnerError::ExecutePhysicalPlan { source } => source.status_code(), } } @@ -165,6 +172,12 @@ impl From for DataFusionError { } } +impl From for Error { + fn from(source: BoxedError) -> Self { + InnerError::ExecutePhysicalPlan { source }.into() + } +} + #[cfg(test)] mod tests { use arrow::error::ArrowError; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 78da04cea7..3eb9a967dc 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -55,9 +55,9 @@ impl Default for DatanodeOptions { fn default() -> Self { Self { node_id: 0, - rpc_addr: "0.0.0.0:3001".to_string(), + rpc_addr: "127.0.0.1:3001".to_string(), rpc_runtime_size: 8, - mysql_addr: "0.0.0.0:3306".to_string(), + mysql_addr: "127.0.0.1:3306".to_string(), mysql_runtime_size: 2, meta_client_opts: MetaClientOpts::default(), wal_dir: "/tmp/greptimedb/wal".to_string(), diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index f5593db57c..d837b7799d 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -145,9 +145,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to convert datafusion type: {}", from))] - Conversion { from: String }, - #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String }, @@ -229,12 +226,6 @@ pub enum Error { source: script::error::Error, }, - #[snafu(display("Failed to collect RecordBatches, source: {}", source))] - CollectRecordBatches { - #[snafu(backtrace)] - source: common_recordbatch::error::Error, - }, - #[snafu(display( "Failed to parse string to timestamp, string: {}, source: {}", raw, @@ -338,7 +329,6 @@ impl ErrorExt for Error { | Error::CreateDir { .. } | Error::InsertSystemCatalog { .. } | Error::RegisterSchema { .. } - | Error::Conversion { .. } | Error::IntoPhysicalPlan { .. } | Error::UnsupportedExpr { .. } | Error::ColumnDataType { .. } @@ -349,8 +339,6 @@ impl ErrorExt for Error { Error::StartScriptManager { source } => source.status_code(), Error::OpenStorageEngine { source } => source.status_code(), Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, - Error::CollectRecordBatches { source } => source.status_code(), - Error::MetaClientInit { source, .. } => source.status_code(), Error::InsertData { source, .. } => source.status_code(), Error::EmptyInsertBatch => StatusCode::InvalidArguments, diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 5a58709d45..c863743d29 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_grpc::select::to_object_result; use common_insert::insertion_expr_to_request; use common_query::Output; use query::plan::LogicalPlan; @@ -36,7 +37,6 @@ use crate::error::{ }; use crate::instance::Instance; use crate::server::grpc::plan::PhysicalPlanner; -use crate::server::grpc::select::to_object_result; impl Instance { pub async fn execute_grpc_insert( diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 2900ba751c..1a75f4c571 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -14,4 +14,3 @@ mod ddl; pub(crate) mod plan; -pub mod select; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index ddd3e3f4bc..e14a3d8e84 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -70,7 +70,7 @@ impl ConcreteDataType { matches!(self, ConcreteDataType::Boolean(_)) } - pub fn is_string(&self) -> bool { + pub fn stringifiable(&self) -> bool { matches!( self, ConcreteDataType::String(_) diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index ad5ba0a6e7..8b71372e50 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -389,6 +389,30 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Failed to build DataFusion logical plan, source: {}", source))] + BuildDfLogicalPlan { + source: datafusion_common::DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to convert Arrow schema, source: {}", source))] + ConvertArrowSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to collect Recordbatch stream, source: {}", source))] + CollectRecordbatchStream { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to create Recordbatches, source: {}", source))] + CreateRecordbatches { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, } pub type Result = std::result::Result; @@ -418,7 +442,8 @@ impl ErrorExt for Error { Error::ConvertColumnDefaultConstraint { source, .. } | Error::ConvertScalarValue { source, .. } - | Error::VectorComputation { source } => source.status_code(), + | Error::VectorComputation { source } + | Error::ConvertArrowSchema { source } => source.status_code(), Error::ConnectDatanode { source, .. } | Error::RequestDatanode { source } @@ -434,7 +459,8 @@ impl ErrorExt for Error { | Error::FindLeaderPeer { .. } | Error::FindRegionPartition { .. } | Error::IllegalTableRoutesData { .. } - | Error::UnsupportedExpr { .. } => StatusCode::Internal, + | Error::UnsupportedExpr { .. } + | Error::BuildDfLogicalPlan { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { StatusCode::Unexpected @@ -467,6 +493,9 @@ impl ErrorExt for Error { Error::ExecuteSql { source, .. } => source.status_code(), Error::InsertBatchToRequest { source, .. } => source.status_code(), Error::CreateDatabase { source, .. } => source.status_code(), + Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => { + source.status_code() + } } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index c35b9a61c5..f9ee4c3a7c 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -44,7 +44,7 @@ pub struct FrontendOptions { impl Default for FrontendOptions { fn default() -> Self { Self { - http_addr: Some("0.0.0.0:4000".to_string()), + http_addr: Some("127.0.0.1:4000".to_string()), grpc_options: Some(GrpcOptions::default()), mysql_options: Some(MysqlOptions::default()), postgres_options: Some(PostgresOptions::default()), diff --git a/src/frontend/src/grpc.rs b/src/frontend/src/grpc.rs index 8c20556f1a..49044dfc4f 100644 --- a/src/frontend/src/grpc.rs +++ b/src/frontend/src/grpc.rs @@ -23,7 +23,7 @@ pub struct GrpcOptions { impl Default for GrpcOptions { fn default() -> Self { Self { - addr: "0.0.0.0:4001".to_string(), + addr: "127.0.0.1:4001".to_string(), runtime_size: 8, } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index e34e8a271f..51fbe1706c 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -26,8 +26,8 @@ use api::v1::alter_expr::Kind; use api::v1::codec::InsertBatch; use api::v1::object_expr::Expr; use api::v1::{ - admin_expr, insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, - CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, + admin_expr, insert_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, + CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; @@ -37,6 +37,7 @@ use client::{Client, Database, Select}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::{BoxedError, StatusCode}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_grpc::select::to_object_result; use common_query::Output; use common_telemetry::{debug, error, info}; use distributed::DistInstance; @@ -531,21 +532,26 @@ impl Instance { } } +fn parse_stmt(sql: &str) -> Result { + let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}) + .context(error::ParseSqlSnafu)?; + // TODO(LFC): Support executing multiple SQL queries, + // which seems to be a major change to our whole server framework? + ensure!( + stmt.len() == 1, + error::InvalidSqlSnafu { + err_msg: "Currently executing multiple SQL queries are not supported." + } + ); + Ok(stmt.remove(0)) +} + #[async_trait] impl SqlQueryHandler for Instance { async fn do_query(&self, query: &str) -> server_error::Result { - let mut stmt = ParserContext::create_with_dialect(query, &GenericDialect {}) + let stmt = parse_stmt(query) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query })?; - if stmt.len() != 1 { - // TODO(LFC): Support executing multiple SQLs, - // which seems to be a major change to our whole server framework? - return server_error::NotSupportedSnafu { - feat: "Only one SQL is allowed to be executed at one time.", - } - .fail(); - } - let stmt = stmt.remove(0); match stmt { Statement::Query(_) => self @@ -680,16 +686,40 @@ impl GrpcQueryHandler for Instance { query: format!("{:?}", query), }) } - - // FIXME(hl): refactor - _ => self - .database(DEFAULT_SCHEMA_NAME) - .object(query.clone()) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{:?}", query), - }), + Expr::Select(select) => { + let select = select + .expr + .as_ref() + .context(server_error::InvalidQuerySnafu { + reason: "empty query", + })?; + match select { + select_expr::Expr::Sql(sql) => { + let output = SqlQueryHandler::do_query(self, sql).await; + Ok(to_object_result(output).await) + } + _ => { + if self.dist_instance.is_some() { + return server_error::NotSupportedSnafu { + feat: "Executing plan directly in Frontend.", + } + .fail(); + } + // FIXME(hl): refactor + self.database(DEFAULT_SCHEMA_NAME) + .object(query.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", query), + }) + } + } + } + _ => server_error::NotSupportedSnafu { + feat: "Currently only insert and select is supported in GRPC service.", + } + .fail(), } } else { server_error::InvalidQuerySnafu { diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 224cb6bed7..e0b81c008c 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -15,9 +15,11 @@ use api::prometheus::remote::read_request::ResponseType; use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; use async_trait::async_trait; -use client::{Database, ObjectResult, Select}; +use client::{ObjectResult, Select}; use common_error::prelude::BoxedError; +use common_grpc::select::to_object_result; use common_telemetry::logging; +use futures_util::TryFutureExt; use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; @@ -25,7 +27,7 @@ use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use snafu::{OptionExt, ResultExt}; use crate::frontend::Mode; -use crate::instance::Instance; +use crate::instance::{parse_stmt, Instance}; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -73,31 +75,37 @@ fn object_result_to_query_result( }) } -async fn handle_remote_queries( - db: &Database, - queries: &[Query], -) -> ServerResult> { - let mut results = Vec::with_capacity(queries.len()); +impl Instance { + async fn handle_remote_queries( + &self, + db: &str, + queries: &[Query], + ) -> ServerResult> { + let mut results = Vec::with_capacity(queries.len()); - for q in queries { - let (table_name, sql) = prometheus::query_to_sql(db.name(), q)?; + for query in queries { + let (table_name, sql) = prometheus::query_to_sql(db, query)?; + logging::debug!( + "prometheus remote read, table: {}, sql: {}", + table_name, + sql + ); - logging::debug!( - "prometheus remote read, table: {}, sql: {}", - table_name, - sql - ); - - let object_result = db - .select(Select::Sql(sql.clone())) - .await + let object_result = if let Some(dist_instance) = &self.dist_instance { + let output = futures::future::ready(parse_stmt(&sql)) + .and_then(|stmt| dist_instance.handle_sql(&sql, stmt)) + .await; + to_object_result(output).await.try_into() + } else { + self.database(db).select(Select::Sql(sql.clone())).await + } .map_err(BoxedError::new) .context(error::ExecuteQuerySnafu { query: sql })?; - results.push((table_name, object_result)); + results.push((table_name, object_result)); + } + Ok(results) } - - Ok(results) } #[async_trait] @@ -138,7 +146,9 @@ impl PrometheusProtocolHandler for Instance { let response_type = negotiate_response_type(&request.accepted_response_types)?; // TODO(dennis): use read_hints to speedup query if possible - let results = handle_remote_queries(&self.database(database), &request.queries).await?; + let results = self + .handle_remote_queries(database, &request.queries) + .await?; match response_type { ResponseType::Samples => { diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 1059f1081d..82807d0582 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -22,7 +22,6 @@ pub mod frontend; pub mod grpc; pub mod influxdb; pub mod instance; -pub(crate) mod mock; pub mod mysql; pub mod opentsdb; pub mod partitioning; diff --git a/src/frontend/src/mysql.rs b/src/frontend/src/mysql.rs index 56ef19834a..71bb600753 100644 --- a/src/frontend/src/mysql.rs +++ b/src/frontend/src/mysql.rs @@ -23,7 +23,7 @@ pub struct MysqlOptions { impl Default for MysqlOptions { fn default() -> Self { Self { - addr: "0.0.0.0:4002".to_string(), + addr: "127.0.0.1:4002".to_string(), runtime_size: 2, } } diff --git a/src/frontend/src/opentsdb.rs b/src/frontend/src/opentsdb.rs index c905189a17..16cc5c5fa8 100644 --- a/src/frontend/src/opentsdb.rs +++ b/src/frontend/src/opentsdb.rs @@ -23,7 +23,7 @@ pub struct OpentsdbOptions { impl Default for OpentsdbOptions { fn default() -> Self { Self { - addr: "0.0.0.0:4242".to_string(), + addr: "127.0.0.1:4242".to_string(), runtime_size: 2, } } diff --git a/src/frontend/src/postgres.rs b/src/frontend/src/postgres.rs index a77243ce37..41a11233bc 100644 --- a/src/frontend/src/postgres.rs +++ b/src/frontend/src/postgres.rs @@ -24,7 +24,7 @@ pub struct PostgresOptions { impl Default for PostgresOptions { fn default() -> Self { Self { - addr: "0.0.0.0:4003".to_string(), + addr: "127.0.0.1:4003".to_string(), runtime_size: 2, check_pwd: false, } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 9d34f1a81c..da41bbe46c 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use async_trait::async_trait; use client::Database; +use common_error::prelude::BoxedError; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; @@ -40,7 +41,6 @@ use tokio::sync::RwLock; use crate::datanode::DatanodeClients; use crate::error::{self, Error, Result}; -use crate::mock::{DatanodeInstance, TableScanPlan}; use crate::partitioning::columns::RangeColumnsPartitionRule; use crate::partitioning::range::RangePartitionRule; use crate::partitioning::{ @@ -48,7 +48,10 @@ use crate::partitioning::{ }; use crate::spliter::WriteSpliter; use crate::table::route::TableRoutes; +use crate::table::scan::{DatanodeInstance, TableScanPlan}; + pub mod insert; +pub(crate) mod scan; #[derive(Clone)] pub struct DistTable { @@ -399,7 +402,7 @@ impl PhysicalPlan for DistTableScan { _runtime: Arc, ) -> QueryResult { let exec = &self.partition_execs[partition]; - exec.maybe_init().await; + exec.maybe_init().await.map_err(BoxedError::new)?; Ok(exec.as_stream().await) } } @@ -415,14 +418,14 @@ struct PartitionExec { } impl PartitionExec { - async fn maybe_init(&self) { + async fn maybe_init(&self) -> Result<()> { if self.batches.read().await.is_some() { - return; + return Ok(()); } let mut batches = self.batches.write().await; if batches.is_some() { - return; + return Ok(()); } let plan = TableScanPlan { @@ -431,8 +434,9 @@ impl PartitionExec { filters: self.filters.clone(), limit: self.limit, }; - let result = self.datanode_instance.grpc_table_scan(plan).await; + let result = self.datanode_instance.grpc_table_scan(plan).await?; let _ = batches.insert(result); + Ok(()) } async fn as_stream(&self) -> SendableRecordBatchStream { diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 09dabcb02a..ceb6780e13 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -29,7 +29,7 @@ use table::requests::InsertRequest; use super::DistTable; use crate::error; use crate::error::Result; -use crate::mock::DatanodeInstance; +use crate::table::scan::DatanodeInstance; impl DistTable { pub async fn dist_insert( diff --git a/src/frontend/src/mock.rs b/src/frontend/src/table/scan.rs similarity index 69% rename from src/frontend/src/mock.rs rename to src/frontend/src/table/scan.rs index 9dfbcd256b..1919dc0fb6 100644 --- a/src/frontend/src/mock.rs +++ b/src/frontend/src/table/scan.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// FIXME(LFC): no mock - use std::fmt::Formatter; use std::sync::Arc; @@ -24,13 +22,16 @@ use common_query::Output; use common_recordbatch::{util, RecordBatches}; use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder}; use datafusion_expr::Expr as DfExpr; -use datatypes::prelude::Value; +use datatypes::prelude::*; use datatypes::schema::SchemaRef; use meta_client::rpc::TableName; use query::plan::LogicalPlan; +use snafu::ResultExt; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; +use crate::error::{self, Result}; + #[derive(Clone)] pub struct DatanodeInstance { table: TableRef, @@ -52,25 +53,33 @@ impl DatanodeInstance { self.db.insert(request).await } - pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches { - let logical_plan = self.build_logical_plan(&plan); - common_telemetry::info!("logical_plan: {:?}", logical_plan); + pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { + let logical_plan = self.build_logical_plan(&plan)?; + // TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter. - let sql = to_sql(logical_plan); - let result = self.db.select(Select::Sql(sql)).await.unwrap(); + let sql = to_sql(logical_plan)?; - let output: Output = result.try_into().unwrap(); - let recordbatches = match output { - Output::Stream(stream) => util::collect(stream).await.unwrap(), - Output::RecordBatches(x) => x.take(), + let output = self + .db + .select(Select::Sql(sql)) + .await + .and_then(Output::try_from) + .context(error::SelectSnafu)?; + + Ok(match output { + Output::Stream(stream) => { + let schema = stream.schema(); + let batches = util::collect(stream) + .await + .context(error::CollectRecordbatchStreamSnafu)?; + RecordBatches::try_new(schema, batches).context(error::CreateRecordbatchesSnafu)? + } + Output::RecordBatches(x) => x, _ => unreachable!(), - }; - - let schema = recordbatches.first().unwrap().schema.clone(); - RecordBatches::try_new(schema, recordbatches).unwrap() + }) } - fn build_logical_plan(&self, table_scan: &TableScanPlan) -> LogicalPlan { + fn build_logical_plan(&self, table_scan: &TableScanPlan) -> Result { let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); let mut builder = LogicalPlanBuilder::scan_with_filters( @@ -83,13 +92,16 @@ impl DatanodeInstance { .map(|x| x.df_expr().clone()) .collect::>(), ) - .unwrap(); + .context(error::BuildDfLogicalPlanSnafu)?; + if let Some(limit) = table_scan.limit { - builder = builder.limit(limit).unwrap(); + builder = builder + .limit(limit) + .context(error::BuildDfLogicalPlanSnafu)?; } - let plan = builder.build().unwrap(); - LogicalPlan::DfPlan(plan) + let plan = builder.build().context(error::BuildDfLogicalPlanSnafu)?; + Ok(LogicalPlan::DfPlan(plan)) } } @@ -101,14 +113,20 @@ pub(crate) struct TableScanPlan { pub limit: Option, } -fn to_sql(plan: LogicalPlan) -> String { +fn to_sql(plan: LogicalPlan) -> Result { let LogicalPlan::DfPlan(plan) = plan; let table_scan = match plan { DfLogicPlan::TableScan(table_scan) => table_scan, _ => unreachable!("unknown plan: {:?}", plan), }; - let schema: SchemaRef = Arc::new(table_scan.source.schema().try_into().unwrap()); + let schema: SchemaRef = Arc::new( + table_scan + .source + .schema() + .try_into() + .context(error::ConvertArrowSchemaSnafu)?, + ); let projection = table_scan .projection .map(|x| { @@ -131,7 +149,7 @@ fn to_sql(plan: LogicalPlan) -> String { .filters .iter() .map(expr_to_sql) - .collect::>() + .collect::>>()? .join(" AND "); if !filters.is_empty() { sql.push_str(" where "); @@ -142,30 +160,31 @@ fn to_sql(plan: LogicalPlan) -> String { sql.push_str(" limit "); sql.push_str(&limit.to_string()); } - sql + Ok(sql) } -fn expr_to_sql(expr: &DfExpr) -> String { - match expr { +fn expr_to_sql(expr: &DfExpr) -> Result { + Ok(match expr { DfExpr::BinaryExpr { ref left, ref right, ref op, } => format!( "{} {} {}", - expr_to_sql(left.as_ref()), + expr_to_sql(left.as_ref())?, op, - expr_to_sql(right.as_ref()) + expr_to_sql(right.as_ref())? ), DfExpr::Column(c) => c.name.clone(), DfExpr::Literal(sv) => { - let v: Value = Value::try_from(sv.clone()).unwrap(); - if v.data_type().is_string() { + let v: Value = Value::try_from(sv.clone()) + .with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?; + if matches!(v.data_type(), ConcreteDataType::String(_)) { format!("'{}'", sv) } else { format!("{}", sv) } } _ => unimplemented!("not implemented for {:?}", expr), - } + }) } diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 18ec172c10..7e59bb3908 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -69,8 +69,8 @@ pub(crate) async fn create_datanode_client( // Move client to an option so we can _move_ the inner value // on the first attempt to connect. All other attempts will fail. let mut client = Some(client); - // "0.0.0.0:3001" is just a placeholder, does not actually connect to it. - let addr = "0.0.0.0:3001"; + // "127.0.0.1:3001" is just a placeholder, does not actually connect to it. + let addr = "127.0.0.1:3001"; let channel_manager = ChannelManager::new(); channel_manager .reset_with_connector( diff --git a/src/meta-srv/src/handler/datanode_lease.rs b/src/meta-srv/src/handler/datanode_lease.rs index ad2467287d..66f94eeb31 100644 --- a/src/meta-srv/src/handler/datanode_lease.rs +++ b/src/meta-srv/src/handler/datanode_lease.rs @@ -78,7 +78,7 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let ctx = Context { datanode_lease_secs: 30, - server_addr: "0.0.0.0:0000".to_string(), + server_addr: "127.0.0.1:0000".to_string(), kv_store, election: None, skip_all: Arc::new(AtomicBool::new(false)), diff --git a/src/meta-srv/src/handler/response_header.rs b/src/meta-srv/src/handler/response_header.rs index 97981eb497..509d3e9aef 100644 --- a/src/meta-srv/src/handler/response_header.rs +++ b/src/meta-srv/src/handler/response_header.rs @@ -55,7 +55,7 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let ctx = Context { datanode_lease_secs: 30, - server_addr: "0.0.0.0:0000".to_string(), + server_addr: "127.0.0.1:0000".to_string(), kv_store, election: None, skip_all: Arc::new(AtomicBool::new(false)), diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index db292042c7..71a24acbd6 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -182,7 +182,7 @@ mod tests { fn test_datanode_lease_value() { let value = LeaseValue { timestamp_millis: 111, - node_addr: "0.0.0.0:3002".to_string(), + node_addr: "127.0.0.1:3002".to_string(), }; let value_bytes: Vec = value.clone().try_into().unwrap(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index e0f2548ce1..e6a8373f18 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -42,9 +42,9 @@ pub struct MetaSrvOptions { impl Default for MetaSrvOptions { fn default() -> Self { Self { - bind_addr: "0.0.0.0:3002".to_string(), - server_addr: "0.0.0.0:3002".to_string(), - store_addr: "0.0.0.0:2379".to_string(), + bind_addr: "127.0.0.1:3002".to_string(), + server_addr: "127.0.0.1:3002".to_string(), + store_addr: "127.0.0.1:2379".to_string(), datanode_lease_secs: 15, } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 9d24b5ebf0..3b9ee90b29 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -74,7 +74,7 @@ fn parse_string_to_value( data_type: &ConcreteDataType, ) -> Result { ensure!( - data_type.is_string(), + data_type.stringifiable(), ColumnTypeMismatchSnafu { column_name, expect: data_type.clone(),