diff --git a/Cargo.lock b/Cargo.lock index 33df1779b8..483297a964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1984,7 +1984,6 @@ dependencies = [ "datafusion", "datafusion-common 7.0.0", "datatypes", - "frontend", "futures", "hyper", "log-store", diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 7856c66e16..c57cda3f97 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -25,12 +25,6 @@ pub enum Error { source: datanode::error::Error, }, - #[snafu(display("Failed to build frontend, source: {}", source))] - BuildFrontend { - #[snafu(backtrace)] - source: frontend::error::Error, - }, - #[snafu(display("Failed to start frontend, source: {}", source))] StartFrontend { #[snafu(backtrace)] @@ -75,7 +69,6 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } Error::IllegalConfig { .. } => StatusCode::InvalidArguments, - Error::BuildFrontend { source, .. } => source.status_code(), } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 100f411d30..e395d0912b 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -78,7 +78,7 @@ impl StartCommand { let opts: FrontendOptions = self.try_into()?; let mut frontend = Frontend::new( opts.clone(), - Instance::try_new(&opts) + Instance::try_new_distributed(&opts) .await .context(error::StartFrontendSnafu)?, ); @@ -213,7 +213,6 @@ mod tests { let fe_opts = FrontendOptions::try_from(command).unwrap(); assert_eq!(Mode::Distributed, fe_opts.mode); - assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr); assert_eq!( "127.0.0.1:4000".to_string(), fe_opts.http_options.as_ref().unwrap().addr diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b3a86e3fb3..e72166b303 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -28,11 +28,8 @@ use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::Mode; use snafu::ResultExt; -use tokio::try_join; -use crate::error::{ - BuildFrontendSnafu, Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu, -}; +use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu}; use crate::toml_loader; #[derive(Parser)] @@ -104,7 +101,6 @@ impl StandaloneOptions { influxdb_options: self.influxdb_options, prometheus_options: self.prometheus_options, mode: self.mode, - datanode_rpc_addr: "127.0.0.1:3001".to_string(), meta_client_opts: None, } } @@ -162,7 +158,7 @@ impl StartCommand { let mut datanode = Datanode::new(dn_opts.clone()) .await .context(StartDatanodeSnafu)?; - let mut frontend = build_frontend(fe_opts, &dn_opts, datanode.get_instance()).await?; + let mut frontend = build_frontend(fe_opts, datanode.get_instance()).await?; // Start datanode instance before starting services, to avoid requests come in before internal components are started. datanode @@ -171,11 +167,7 @@ impl StartCommand { .context(StartDatanodeSnafu)?; info!("Datanode instance started"); - try_join!( - async { datanode.start_services().await.context(StartDatanodeSnafu) }, - async { frontend.start().await.context(StartFrontendSnafu) } - )?; - + frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } } @@ -183,17 +175,9 @@ impl StartCommand { /// Build frontend instance in standalone mode async fn build_frontend( fe_opts: FrontendOptions, - dn_opts: &DatanodeOptions, datanode_instance: InstanceRef, ) -> Result> { - let grpc_server_addr = &dn_opts.rpc_addr; - info!( - "Build frontend with datanode gRPC addr: {}", - grpc_server_addr - ); - let mut frontend_instance = FeInstance::try_new(&fe_opts) - .await - .context(BuildFrontendSnafu)?; + let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); frontend_instance.set_script_handler(datanode_instance); Ok(Frontend::new(fe_opts, frontend_instance)) @@ -289,7 +273,6 @@ mod tests { let fe_opts = FrontendOptions::try_from(cmd).unwrap(); assert_eq!(Mode::Standalone, fe_opts.mode); - assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr); assert_eq!( "127.0.0.1:4000".to_string(), fe_opts.http_options.as_ref().unwrap().addr diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 0801370dbd..516f697d3b 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -23,7 +23,7 @@ use common_base::BitVec; use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; -use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::arrow::array::{Array, BooleanArray, PrimitiveArray}; use datatypes::arrow_array::{BinaryArray, StringArray}; use datatypes::schema::SchemaRef; @@ -47,13 +47,9 @@ pub async fn to_object_result(output: std::result::Result } } async fn collect(stream: SendableRecordBatchStream) -> Result { - let schema = stream.schema(); - - let recordbatches = util::collect(stream) + let recordbatches = RecordBatches::try_collect(stream) .await - .and_then(|batches| RecordBatches::try_new(schema, batches)) .context(error::CollectRecordBatchesSnafu)?; - let object_result = build_result(recordbatches)?; Ok(object_result) } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index ce2c2f1e5a..2809040326 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -27,7 +27,7 @@ use datatypes::prelude::VectorRef; use datatypes::schema::{Schema, SchemaRef}; use error::Result; use futures::task::{Context, Poll}; -use futures::Stream; +use futures::{Stream, TryStreamExt}; pub use recordbatch::RecordBatch; use snafu::ensure; @@ -80,6 +80,12 @@ impl RecordBatches { Ok(Self { schema, batches }) } + pub async fn try_collect(stream: SendableRecordBatchStream) -> Result { + let schema = stream.schema(); + let batches = stream.try_collect::>().await?; + Ok(Self { schema, batches }) + } + #[inline] pub fn empty() -> Self { Self { diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 159ec0ef44..47f34d2186 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -29,7 +29,6 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "simd", ] } datatypes = { path = "../datatypes" } -frontend = { path = "../frontend" } futures = "0.3" hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 5e3eee4b94..452131d0d7 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -26,7 +26,7 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = ] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } - +datanode = { path = "../datanode" } datatypes = { path = "../datatypes" } futures = "0.3" futures-util = "0.3" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index aea667367f..0c66980334 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -59,6 +59,16 @@ impl FrontendCatalogManager { pub(crate) fn backend(&self) -> KvBackendRef { self.backend.clone() } + + #[cfg(test)] + pub(crate) fn table_routes(&self) -> Arc { + self.table_routes.clone() + } + + #[cfg(test)] + pub(crate) fn datanode_clients(&self) -> Arc { + self.datanode_clients.clone() + } } // FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 823ce693ce..eae56a12f8 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -244,18 +244,6 @@ pub enum Error { source: client::Error, }, - #[snafu(display("Failed to alter table, source: {}", source))] - AlterTable { - #[snafu(backtrace)] - source: client::Error, - }, - - #[snafu(display("Failed to drop table, source: {}", source))] - DropTable { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to insert values to table, source: {}", source))] Insert { #[snafu(backtrace)] @@ -398,9 +386,6 @@ pub enum Error { source: query::error::Error, }, - #[snafu(display("Unsupported expr type: {}", name))] - UnsupportedExpr { name: String, backtrace: Backtrace }, - #[snafu(display("Failed to do vector computation, source: {}", source))] VectorComputation { #[snafu(backtrace)] @@ -451,6 +436,12 @@ pub enum Error { #[snafu(backtrace)] source: substrait::error::Error, }, + + #[snafu(display("Failed to invoke GRPC server, source: {}", source))] + InvokeGrpcServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, } pub type Result = std::result::Result; @@ -470,7 +461,9 @@ impl ErrorExt for Error { Error::RuntimeResource { source, .. } => source.status_code(), - Error::StartServer { source, .. } => source.status_code(), + Error::StartServer { source, .. } | Error::InvokeGrpcServer { source } => { + source.status_code() + } Error::ParseSql { source } => source.status_code(), @@ -500,7 +493,6 @@ impl ErrorExt for Error { | Error::FindLeaderPeer { .. } | Error::FindRegionPartition { .. } | Error::IllegalTableRoutesData { .. } - | Error::UnsupportedExpr { .. } | Error::BuildDfLogicalPlan { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { @@ -522,8 +514,6 @@ impl ErrorExt for Error { Error::SchemaNotFound { .. } => StatusCode::InvalidArguments, Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, Error::CreateTable { source, .. } - | Error::AlterTable { source, .. } - | Error::DropTable { source } | Error::Select { source, .. } | Error::CreateDatabase { source, .. } | Error::CreateTableOnInsertion { source, .. } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 521ed6c834..92f8fa3b0d 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -40,7 +40,6 @@ pub struct FrontendOptions { pub influxdb_options: Option, pub prometheus_options: Option, pub mode: Mode, - pub datanode_rpc_addr: String, pub meta_client_opts: Option, } @@ -55,18 +54,11 @@ impl Default for FrontendOptions { influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), mode: Mode::Standalone, - datanode_rpc_addr: "127.0.0.1:3001".to_string(), meta_client_opts: None, } } } -impl FrontendOptions { - pub(crate) fn datanode_grpc_addr(&self) -> String { - self.datanode_rpc_addr.clone() - } -} - pub struct Frontend where T: FrontendInstance, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index b1c04389a7..1616825157 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -20,50 +20,49 @@ mod prometheus; use std::sync::Arc; use std::time::Duration; -use api::result::ObjectResultBuilder; +use api::result::{ObjectResultBuilder, PROTOCOL_VERSION}; use api::v1::alter_expr::Kind; use api::v1::object_expr::Expr; use api::v1::{ - admin_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, - CreateDatabaseExpr, CreateExpr, DropTableExpr, InsertExpr, ObjectExpr, + admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr, + CreateExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef}; -use client::admin::{admin_result_to_output, Admin}; -use client::{Client, Database, Select}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_error::prelude::{BoxedError, StatusCode}; +use client::admin::admin_result_to_output; +use client::ObjectResult; +use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_grpc::select::to_object_result; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, info}; +use datanode::instance::InstanceRef as DnInstanceRef; use distributed::DistInstance; -use meta_client::client::MetaClientBuilder; +use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use servers::query_handler::{ - GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, - PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler, + GrpcAdminHandler, GrpcAdminHandlerRef, GrpcQueryHandler, GrpcQueryHandlerRef, + InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, + ScriptHandlerRef, SqlQueryHandler, SqlQueryHandlerRef, }; use servers::{error as server_error, Mode}; -use session::context::{QueryContext, QueryContextRef}; +use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::create::Partitions; -use sql::statements::explain::Explain; use sql::statements::insert::Insert; use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu, - CreateDatabaseSnafu, CreateTableSnafu, DropTableSnafu, FindNewColumnsOnInsertionSnafu, - InsertSnafu, MissingMetasrvOptsSnafu, Result, SchemaNotFoundSnafu, SelectSnafu, - UnsupportedExprSnafu, + self, AlterTableOnInsertionSnafu, CatalogNotFoundSnafu, CatalogSnafu, CreateDatabaseSnafu, + CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result, + SchemaNotFoundSnafu, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; @@ -91,9 +90,6 @@ pub type FrontendInstanceRef = Arc; #[derive(Clone)] pub struct Instance { - // TODO(hl): In standalone mode, there is only one client. - // But in distribute mode, frontend should fetch datanodes' addresses from metasrv. - client: Client, /// catalog manager is None in standalone mode, datanode will keep their own catalog_manager: Option, /// Script handler is None in distributed mode, only works on standalone mode. @@ -103,94 +99,87 @@ pub struct Instance { // Standalone and Distributed, then the code behind it doesn't need to use so // many match statements. mode: Mode, - // TODO(LFC): Refactor consideration: Can we split Frontend to DistInstance and EmbedInstance? - dist_instance: Option, -} -impl Default for Instance { - fn default() -> Self { - Self { - client: Client::default(), - catalog_manager: None, - script_handler: None, - create_expr_factory: Arc::new(DefaultCreateExprFactory {}), - mode: Mode::Standalone, - dist_instance: None, - } - } + // TODO(LFC): Remove `dist_instance` together with Arrow Flight adoption refactor. + dist_instance: Option, + + sql_handler: SqlQueryHandlerRef, + grpc_query_handler: GrpcQueryHandlerRef, + grpc_admin_handler: GrpcAdminHandlerRef, } impl Instance { - pub async fn try_new(opts: &FrontendOptions) -> Result { - let mut instance = Instance { - mode: opts.mode.clone(), - ..Default::default() - }; + pub async fn try_new_distributed(opts: &FrontendOptions) -> Result { + let meta_client = Self::create_meta_client(opts).await?; - let addr = opts.datanode_grpc_addr(); - instance.client.start(vec![addr]); + let meta_backend = Arc::new(MetaKvBackend { + client: meta_client.clone(), + }); + let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); + let datanode_clients = Arc::new(DatanodeClients::new()); + let catalog_manager = Arc::new(FrontendCatalogManager::new( + meta_backend, + table_routes, + datanode_clients.clone(), + )); - instance.dist_instance = match &opts.mode { - Mode::Standalone => None, - Mode::Distributed => { - let metasrv_addr = &opts - .meta_client_opts - .as_ref() - .context(MissingMetasrvOptsSnafu)? - .metasrv_addrs; - info!( - "Creating Frontend instance in distributed mode with Meta server addr {:?}", - metasrv_addr - ); + let dist_instance = + DistInstance::new(meta_client, catalog_manager.clone(), datanode_clients); + let dist_instance_ref = Arc::new(dist_instance.clone()); - let meta_config = MetaClientOpts::default(); - let channel_config = ChannelConfig::new() - .timeout(Duration::from_millis(meta_config.timeout_millis)) - .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) - .tcp_nodelay(meta_config.tcp_nodelay); - - let channel_manager = ChannelManager::with_config(channel_config); - - let mut meta_client = MetaClientBuilder::new(0, 0) - .enable_router() - .enable_store() - .channel_manager(channel_manager) - .build(); - meta_client - .start(metasrv_addr) - .await - .context(error::StartMetaClientSnafu)?; - let meta_client = Arc::new(meta_client); - - let meta_backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); - let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); - let datanode_clients = Arc::new(DatanodeClients::new()); - let catalog_manager = Arc::new(FrontendCatalogManager::new( - meta_backend, - table_routes, - datanode_clients.clone(), - )); - - instance.catalog_manager = Some(catalog_manager.clone()); - - Some(DistInstance::new( - meta_client, - catalog_manager, - datanode_clients, - )) - } - }; - Ok(instance) + Ok(Instance { + catalog_manager: Some(catalog_manager), + script_handler: None, + create_expr_factory: Arc::new(DefaultCreateExprFactory), + mode: Mode::Distributed, + dist_instance: Some(dist_instance), + sql_handler: dist_instance_ref.clone(), + grpc_query_handler: dist_instance_ref.clone(), + grpc_admin_handler: dist_instance_ref, + }) } - pub fn database(&self, database: &str) -> Database { - Database::new(database, self.client.clone()) + async fn create_meta_client(opts: &FrontendOptions) -> Result> { + let metasrv_addr = &opts + .meta_client_opts + .as_ref() + .context(MissingMetasrvOptsSnafu)? + .metasrv_addrs; + info!( + "Creating Frontend instance in distributed mode with Meta server addr {:?}", + metasrv_addr + ); + + let meta_config = MetaClientOpts::default(); + let channel_config = ChannelConfig::new() + .timeout(Duration::from_millis(meta_config.timeout_millis)) + .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) + .tcp_nodelay(meta_config.tcp_nodelay); + let channel_manager = ChannelManager::with_config(channel_config); + + let mut meta_client = MetaClientBuilder::new(0, 0) + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client + .start(metasrv_addr) + .await + .context(error::StartMetaClientSnafu)?; + Ok(Arc::new(meta_client)) } - pub fn admin(&self, database: &str) -> Admin { - Admin::new(database, self.client.clone()) + pub fn new_standalone(dn_instance: DnInstanceRef) -> Self { + Instance { + catalog_manager: None, + script_handler: None, + create_expr_factory: Arc::new(DefaultCreateExprFactory), + mode: Mode::Standalone, + dist_instance: None, + sql_handler: dn_instance.clone(), + grpc_query_handler: dn_instance.clone(), + grpc_admin_handler: dn_instance, + } } pub fn catalog_manager(&self) -> &Option { @@ -213,27 +202,6 @@ impl Instance { self.script_handler = Some(handler); } - async fn handle_select( - &self, - expr: Select, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> Result { - if let Some(dist_instance) = &self.dist_instance { - let Select::Sql(sql) = expr; - dist_instance.handle_sql(&sql, stmt, query_ctx).await - } else { - // TODO(LFC): Refactor consideration: Datanode should directly execute statement in standalone mode to avoid parse SQL again. - // Find a better way to execute query between Frontend and Datanode in standalone mode. - // Otherwise we have to parse SQL first to get schema name. Maybe not GRPC. - self.database(DEFAULT_SCHEMA_NAME) - .select(expr) - .await - .and_then(Output::try_from) - .context(SelectSnafu) - } - } - /// Handle create expr. pub async fn handle_create_table( &self, @@ -243,81 +211,38 @@ impl Instance { if let Some(v) = &self.dist_instance { v.create_table(&mut expr, partitions).await } else { - // Currently standalone mode does not support multi partitions/regions. + let expr = AdminExpr { + header: Some(ExprHeader { + version: PROTOCOL_VERSION, + }), + expr: Some(admin_expr::Expr::Create(expr)), + }; let result = self - .admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME)) - .create(expr.clone()) - .await; - if let Err(e) = &result { - error!(e; "Failed to create table by expr: {:?}", expr); - } - result - .and_then(admin_result_to_output) - .context(CreateTableSnafu) + .grpc_admin_handler + .exec_admin_request(expr) + .await + .context(error::InvokeGrpcServerSnafu)?; + admin_result_to_output(result).context(CreateTableSnafu) } } /// Handle create database expr. pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { let database_name = expr.database_name.clone(); - if let Some(dist_instance) = &self.dist_instance { - dist_instance.handle_create_database(expr).await - } else { - // FIXME(hl): In order to get admin client to create schema, we need to use the default schema admin - self.admin(DEFAULT_SCHEMA_NAME) - .create_database(expr) - .await - .and_then(admin_result_to_output) - .context(CreateDatabaseSnafu { - name: database_name, - }) - } - } - - /// Handle alter expr - pub async fn handle_alter(&self, expr: AlterExpr) -> Result { - match &self.dist_instance { - Some(dist_instance) => dist_instance.handle_alter_table(expr).await, - None => self - .admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME)) - .alter(expr) - .await - .and_then(admin_result_to_output) - .context(AlterTableSnafu), - } - } - - /// Handle drop table expr - pub async fn handle_drop_table(&self, expr: DropTableExpr) -> Result { - match self.mode { - Mode::Standalone => self - .admin(&expr.schema_name) - .drop_table(expr) - .await - .and_then(admin_result_to_output) - .context(DropTableSnafu), - // TODO(ruihang): support drop table in distributed mode - Mode::Distributed => UnsupportedExprSnafu { - name: "Distributed DROP TABLE", - } - .fail(), - } - } - - /// Handle explain expr - pub async fn handle_explain( - &self, - sql: &str, - explain_stmt: Explain, - query_ctx: QueryContextRef, - ) -> Result { - if let Some(dist_instance) = &self.dist_instance { - dist_instance - .handle_sql(sql, Statement::Explain(explain_stmt), query_ctx) - .await - } else { - Ok(Output::AffectedRows(0)) - } + let expr = AdminExpr { + header: Some(ExprHeader { + version: PROTOCOL_VERSION, + }), + expr: Some(admin_expr::Expr::CreateDatabase(expr)), + }; + let result = self + .grpc_admin_handler + .exec_admin_request(expr) + .await + .context(error::InvokeGrpcServerSnafu)?; + admin_result_to_output(result).context(CreateDatabaseSnafu { + name: database_name, + }) } /// Handle batch inserts @@ -333,7 +258,7 @@ impl Instance { } /// Handle insert. for 'values' insertion, create/alter the destination table on demand. - pub async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result { + async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result { let table_name = &insert_expr.table_name; let catalog_name = DEFAULT_CATALOG_NAME; let schema_name = &insert_expr.schema_name; @@ -345,11 +270,17 @@ impl Instance { insert_expr.region_number = 0; - self.database(schema_name) - .insert(insert_expr) + let query = ObjectExpr { + header: Some(ExprHeader { + version: PROTOCOL_VERSION, + }), + expr: Some(Expr::Insert(insert_expr)), + }; + let result = GrpcQueryHandler::do_query(&*self.grpc_query_handler, query) .await - .and_then(Output::try_from) - .context(InsertSnafu) + .context(error::InvokeGrpcServerSnafu)?; + let result: ObjectResult = result.try_into().context(InsertSnafu)?; + result.try_into().context(InsertSnafu) } // check if table already exist: @@ -455,11 +386,19 @@ impl Instance { catalog_name: Some(catalog_name.to_string()), kind: Some(Kind::AddColumns(add_columns)), }; - self.admin(schema_name) - .alter(expr) + + let expr = AdminExpr { + header: Some(ExprHeader { + version: PROTOCOL_VERSION, + }), + expr: Some(admin_expr::Expr::Alter(expr)), + }; + let result = self + .grpc_admin_handler + .exec_admin_request(expr) .await - .and_then(admin_result_to_output) - .context(AlterTableOnInsertionSnafu) + .context(error::InvokeGrpcServerSnafu)?; + admin_result_to_output(result).context(AlterTableOnInsertionSnafu) } fn get_catalog(&self, catalog_name: &str) -> Result { @@ -547,20 +486,6 @@ impl FrontendInstance for Instance { } } -#[cfg(test)] -impl Instance { - pub fn with_client_and_catalog_manager(client: Client, catalog: CatalogManagerRef) -> Self { - Self { - client, - catalog_manager: Some(catalog), - script_handler: None, - create_expr_factory: Arc::new(DefaultCreateExprFactory), - mode: Mode::Standalone, - dist_instance: None, - } - } -} - fn parse_stmt(sql: &str) -> Result { let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}) .context(error::ParseSqlSnafu)?; @@ -587,12 +512,14 @@ impl SqlQueryHandler for Instance { .context(server_error::ExecuteQuerySnafu { query })?; match stmt { - Statement::ShowDatabases(_) + Statement::CreateDatabase(_) + | Statement::ShowDatabases(_) + | Statement::CreateTable(_) | Statement::ShowTables(_) | Statement::DescribeTable(_) + | Statement::Explain(_) | Statement::Query(_) => { - self.handle_select(Select::Sql(query.to_string()), stmt, query_ctx) - .await + return self.sql_handler.do_query(query, query_ctx).await; } Statement::Insert(insert) => match self.mode { Mode::Standalone => { @@ -629,30 +556,18 @@ impl SqlQueryHandler for Instance { Ok(Output::AffectedRows(affected)) } }, - Statement::CreateTable(create) => { - let create_expr = self - .create_expr_factory - .create_expr_by_stmt(&create) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?; - - self.handle_create_table(create_expr, create.partitions) - .await - } - Statement::CreateDatabase(c) => { - let expr = CreateDatabaseExpr { - database_name: c.name.to_string(), - }; - self.handle_create_database(expr).await - } Statement::Alter(alter_stmt) => { - self.handle_alter( - AlterExpr::try_from(alter_stmt) - .map_err(BoxedError::new) - .context(server_error::ExecuteAlterSnafu { query })?, - ) - .await + let expr = AlterExpr::try_from(alter_stmt) + .map_err(BoxedError::new) + .context(server_error::ExecuteAlterSnafu { query })?; + let expr = AdminExpr { + header: Some(ExprHeader { + version: PROTOCOL_VERSION, + }), + expr: Some(admin_expr::Expr::Alter(expr)), + }; + let result = self.grpc_admin_handler.exec_admin_request(expr).await?; + admin_result_to_output(result).context(error::InvalidAdminResultSnafu) } Statement::DropTable(drop_stmt) => { let expr = DropTableExpr { @@ -660,10 +575,14 @@ impl SqlQueryHandler for Instance { schema_name: drop_stmt.schema_name, table_name: drop_stmt.table_name, }; - self.handle_drop_table(expr).await - } - Statement::Explain(explain_stmt) => { - self.handle_explain(query, explain_stmt, query_ctx).await + let expr = AdminExpr { + header: Some(ExprHeader { + version: PROTOCOL_VERSION, + }), + expr: Some(admin_expr::Expr::DropTable(expr)), + }; + let result = self.grpc_admin_handler.exec_admin_request(expr).await?; + admin_result_to_output(result).context(error::InvalidAdminResultSnafu) } Statement::ShowCreateTable(_) => { return server_error::NotSupportedSnafu { feat: query }.fail(); @@ -703,81 +622,34 @@ impl ScriptHandler for Instance { #[async_trait] impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> server_error::Result { - if let Some(expr) = &query.expr { - match expr { - Expr::Insert(insert) => { - // TODO(fys): refactor, avoid clone - let result = self.handle_insert(insert.clone()).await; - result - .map(|o| match o { - Output::AffectedRows(rows) => ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0u32) - .build(), - _ => { - unreachable!() - } - }) - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{:?}", query), - }) - } - Expr::Select(select) => { - let select = select - .expr - .as_ref() - .context(server_error::InvalidQuerySnafu { - reason: "empty query", - })?; - match select { - select_expr::Expr::Sql(sql) => { - let query_ctx = Arc::new(QueryContext::new()); - let output = SqlQueryHandler::do_query(self, sql, query_ctx).await; - Ok(to_object_result(output).await) - } - _ => { - if self.dist_instance.is_some() { - return server_error::NotSupportedSnafu { - feat: "Executing plan directly in Frontend.", - } - .fail(); - } - // FIXME(hl): refactor - self.database(DEFAULT_SCHEMA_NAME) - .object(query.clone()) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{:?}", query), - }) - } - } - } - _ => server_error::NotSupportedSnafu { - feat: "Currently only insert and select is supported in GRPC service.", - } - .fail(), + let expr = query + .clone() + .expr + .context(server_error::InvalidQuerySnafu { + reason: "empty expr", + })?; + match expr { + Expr::Insert(insert_expr) => { + let output = self + .handle_insert(insert_expr.clone()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", insert_expr), + })?; + let object_result = match output { + Output::AffectedRows(rows) => ObjectResultBuilder::default() + .mutate_result(rows as _, 0) + .build(), + _ => unreachable!(), + }; + Ok(object_result) } - } else { - server_error::InvalidQuerySnafu { - reason: "empty query", - } - .fail() + _ => GrpcQueryHandler::do_query(&*self.grpc_query_handler, query).await, } } } -fn get_schema_name(expr: &AdminExpr) -> &str { - let schema_name = match &expr.expr { - Some(admin_expr::Expr::Create(expr)) => expr.schema_name.as_deref(), - Some(admin_expr::Expr::Alter(expr)) => expr.schema_name.as_deref(), - Some(admin_expr::Expr::CreateDatabase(_)) | None => Some(DEFAULT_SCHEMA_NAME), - Some(admin_expr::Expr::DropTable(expr)) => Some(expr.schema_name.as_ref()), - }; - schema_name.unwrap_or(DEFAULT_SCHEMA_NAME) -} - #[async_trait] impl GrpcAdminHandler for Instance { async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result { @@ -786,13 +658,7 @@ impl GrpcAdminHandler for Instance { if let Some(api::v1::admin_expr::Expr::Create(create)) = &mut expr.expr { create.table_id = None; } - self.admin(get_schema_name(&expr)) - .do_request(expr.clone()) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{:?}", expr), - }) + self.grpc_admin_handler.exec_admin_request(expr).await } } @@ -808,6 +674,7 @@ mod tests { }; use datatypes::schema::ColumnDefaultConstraint; use datatypes::value::Value; + use session::context::QueryContext; use super::*; use crate::tests; @@ -853,7 +720,8 @@ mod tests { .await .unwrap(); match output { - Output::RecordBatches(recordbatches) => { + Output::Stream(stream) => { + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let pretty_print = recordbatches.pretty_print(); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ @@ -875,7 +743,8 @@ mod tests { .await .unwrap(); match output { - Output::RecordBatches(recordbatches) => { + Output::Stream(stream) => { + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let pretty_print = recordbatches.pretty_print(); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index d32e12ee24..2613654f8f 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -16,12 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr}; +use api::result::AdminResultBuilder; +use api::v1::{ + admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, ObjectExpr, + ObjectResult, +}; +use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use catalog::CatalogList; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::prelude::BoxedError; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; @@ -33,6 +39,8 @@ use meta_client::rpc::{ }; use query::sql::{describe_table, explain, show_databases, show_tables}; use query::{QueryEngineFactory, QueryEngineRef}; +use servers::error as server_error; +use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler}; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::create::Partitions; @@ -48,6 +56,8 @@ use crate::error::{ PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, TableNotFoundSnafu, }; +use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; +use crate::instance::parse_stmt; use crate::partitioning::{PartitionBound, PartitionDef}; use crate::table::DistTable; @@ -126,15 +136,12 @@ impl DistInstance { .context(error::InvalidAdminResultSnafu)?; } - Ok(Output::AffectedRows(region_routes.len())) + // Checked in real MySQL, it truly returns "0 rows affected". + Ok(Output::AffectedRows(0)) } - pub(crate) async fn handle_sql( - &self, - sql: &str, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> Result { + async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result { + let stmt = parse_stmt(sql)?; match stmt { Statement::Query(_) => { let plan = self @@ -143,6 +150,17 @@ impl DistInstance { .context(error::ExecuteSqlSnafu { sql })?; self.query_engine.execute(&plan).await } + Statement::CreateDatabase(stmt) => { + let expr = CreateDatabaseExpr { + database_name: stmt.name.to_string(), + }; + self.handle_create_database(expr).await?; + Ok(Output::AffectedRows(1)) + } + Statement::CreateTable(stmt) => { + let create_expr = &mut DefaultCreateExprFactory.create_expr_by_stmt(&stmt).await?; + Ok(self.create_table(create_expr, stmt.partitions).await?) + } Statement::ShowDatabases(stmt) => show_databases(stmt, self.catalog_manager.clone()), Statement::ShowTables(stmt) => { show_tables(stmt, self.catalog_manager.clone(), query_ctx) @@ -157,7 +175,7 @@ impl DistInstance { } /// Handles distributed database creation - pub(crate) async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { + async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<()> { let key = SchemaKey { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: expr.database_name, @@ -172,10 +190,10 @@ impl DistInstance { .with_key(key.to_string()) .with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?); client.put(request.into()).await.context(RequestMetaSnafu)?; - Ok(Output::AffectedRows(1)) + Ok(()) } - pub async fn handle_alter_table(&self, expr: AlterExpr) -> Result { + async fn handle_alter_table(&self, expr: AlterExpr) -> Result { let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); let table_name = expr.table_name.as_str(); @@ -200,7 +218,7 @@ impl DistInstance { .downcast_ref::() .expect("Table impl must be DistTable in distributed mode"); dist_table.alter_by_expr(expr).await?; - Ok(Output::AffectedRows(0)) + Ok(AdminResultBuilder::default().mutate_result(0, 0).build()) } async fn create_table_in_meta( @@ -269,6 +287,56 @@ impl DistInstance { } Ok(()) } + + #[cfg(test)] + pub(crate) fn catalog_manager(&self) -> Arc { + self.catalog_manager.clone() + } +} + +#[async_trait] +impl SqlQueryHandler for DistInstance { + async fn do_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> server_error::Result { + self.handle_sql(query, query_ctx) + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query }) + } +} + +#[async_trait] +impl GrpcQueryHandler for DistInstance { + async fn do_query(&self, _: ObjectExpr) -> server_error::Result { + unimplemented!() + } +} + +#[async_trait] +impl GrpcAdminHandler for DistInstance { + async fn exec_admin_request(&self, query: AdminExpr) -> server_error::Result { + let expr = query + .clone() + .expr + .context(server_error::InvalidQuerySnafu { + reason: "empty expr", + })?; + match expr { + admin_expr::Expr::CreateDatabase(create_database) => self + .handle_create_database(create_database) + .await + .map(|_| AdminResultBuilder::default().mutate_result(1, 0).build()), + admin_expr::Expr::Alter(alter) => self.handle_alter_table(alter).await, + _ => unimplemented!(), + } + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { + query: format!("{:?}", query), + }) + } } fn create_table_global_value( @@ -454,12 +522,15 @@ fn find_partition_columns( #[cfg(test)] mod test { + use servers::query_handler::SqlQueryHandlerRef; + use session::context::QueryContext; use sql::parser::ParserContext; use sql::statements::statement::Statement; use sqlparser::dialect::GenericDialect; use super::*; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; + use crate::tests::create_dist_instance; #[tokio::test] async fn test_parse_partitions() { @@ -492,9 +563,10 @@ ENGINE=mito", let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); match &result[0] { Statement::CreateTable(c) => { - common_telemetry::info!("{}", sql); - let factory = DefaultCreateExprFactory {}; - let expr = factory.create_expr_by_stmt(c).await.unwrap(); + let expr = DefaultCreateExprFactory + .create_expr_by_stmt(c) + .await + .unwrap(); let partitions = parse_partitions(&expr, c.partitions.clone()).unwrap(); let json = serde_json::to_string(&partitions).unwrap(); assert_eq!(json, expected); @@ -503,4 +575,103 @@ ENGINE=mito", } } } + + #[tokio::test(flavor = "multi_thread")] + async fn test_show_databases() { + let (dist_instance, _) = create_dist_instance().await; + + let sql = "create database test_show_databases"; + let output = dist_instance + .handle_sql(sql, QueryContext::arc()) + .await + .unwrap(); + match output { + Output::AffectedRows(rows) => assert_eq!(rows, 1), + _ => unreachable!(), + } + + let sql = "show databases"; + let output = dist_instance + .handle_sql(sql, QueryContext::arc()) + .await + .unwrap(); + match output { + Output::RecordBatches(r) => { + let expected1 = vec![ + "+---------------------+", + "| Schemas |", + "+---------------------+", + "| public |", + "| test_show_databases |", + "+---------------------+", + ]; + let expected2 = vec![ + "+---------------------+", + "| Schemas |", + "+---------------------+", + "| test_show_databases |", + "| public |", + "+---------------------+", + ]; + let pretty = r.pretty_print(); + let lines = pretty.lines().collect::>(); + assert!(lines == expected1 || lines == expected2) + } + _ => unreachable!(), + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_show_tables() { + let (dist_instance, datanode_instances) = create_dist_instance().await; + + let sql = "create database test_show_tables"; + dist_instance + .handle_sql(sql, QueryContext::arc()) + .await + .unwrap(); + + let sql = " + CREATE TABLE greptime.test_show_tables.dist_numbers ( + ts BIGINT, + n INT, + TIME INDEX (ts), + ) + PARTITION BY RANGE COLUMNS (n) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), + ) + ENGINE=mito"; + dist_instance + .handle_sql(sql, QueryContext::arc()) + .await + .unwrap(); + + async fn assert_show_tables(instance: SqlQueryHandlerRef) { + let sql = "show tables in test_show_tables"; + let output = instance.do_query(sql, QueryContext::arc()).await.unwrap(); + match output { + Output::RecordBatches(r) => { + let expected = vec![ + "+--------------+", + "| Tables |", + "+--------------+", + "| dist_numbers |", + "+--------------+", + ]; + assert_eq!(r.pretty_print().lines().collect::>(), expected); + } + _ => unreachable!(), + } + } + + assert_show_tables(Arc::new(dist_instance)).await; + + // Asserts that new table is created in Datanode as well. + for x in datanode_instances.values() { + assert_show_tables(x.clone()).await + } + } } diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 66b04b1317..e2c0c91ee0 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -19,7 +19,6 @@ use servers::query_handler::OpentsdbProtocolHandler; use servers::{error as server_error, Mode}; use snafu::prelude::*; -use crate::error::Result; use crate::instance::Instance; #[async_trait] @@ -29,12 +28,7 @@ impl OpentsdbProtocolHandler for Instance { // metric table and tags can be created upon insertion. match self.mode { Mode::Standalone => { - self.insert_opentsdb_metric(data_point) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::PutOpentsdbDataPointSnafu { - data_point: format!("{:?}", data_point), - })?; + self.insert_opentsdb_metric(data_point).await?; } Mode::Distributed => { self.dist_insert(vec![data_point.as_grpc_insert()]) @@ -51,9 +45,14 @@ impl OpentsdbProtocolHandler for Instance { } impl Instance { - async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { - let expr = data_point.as_grpc_insert(); - self.handle_insert(expr).await?; + async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> server_error::Result<()> { + let insert_expr = data_point.as_grpc_insert(); + self.handle_insert(insert_expr) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{:?}", data_point), + })?; Ok(()) } } @@ -63,6 +62,7 @@ mod tests { use std::sync::Arc; use common_query::Output; + use common_recordbatch::RecordBatches; use datafusion::arrow_print; use servers::query_handler::SqlQueryHandler; use session::context::QueryContext; @@ -128,7 +128,8 @@ mod tests { .await .unwrap(); match output { - Output::RecordBatches(recordbatches) => { + Output::Stream(stream) => { + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let recordbatches = recordbatches .take() .into_iter() diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index b6f322beb2..b1ad7ad53c 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -17,11 +17,10 @@ use std::sync::Arc; use api::prometheus::remote::read_request::ResponseType; use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; use async_trait::async_trait; -use client::{ObjectResult, Select}; +use client::ObjectResult; use common_error::prelude::BoxedError; use common_grpc::select::to_object_result; use common_telemetry::logging; -use futures_util::TryFutureExt; use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; @@ -30,7 +29,7 @@ use servers::Mode; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; -use crate::instance::{parse_stmt, Instance}; +use crate::instance::Instance; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -94,19 +93,14 @@ impl Instance { sql ); - let object_result = if let Some(dist_instance) = &self.dist_instance { - let output = futures::future::ready(parse_stmt(&sql)) - .and_then(|stmt| { - let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string())); - dist_instance.handle_sql(&sql, stmt, query_ctx) - }) - .await; - to_object_result(output).await.try_into() - } else { - self.database(db).select(Select::Sql(sql.clone())).await - } - .map_err(BoxedError::new) - .context(error::ExecuteQuerySnafu { query: sql })?; + let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string())); + let output = self.sql_handler.do_query(&sql, query_ctx).await; + + let object_result = to_object_result(output) + .await + .try_into() + .map_err(BoxedError::new) + .context(error::ExecuteQuerySnafu { query: sql })?; results.push((table_name, object_result)); } @@ -117,34 +111,25 @@ impl Instance { #[async_trait] impl PrometheusProtocolHandler for Instance { async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> { + let exprs = prometheus::write_request_to_insert_exprs(database, request.clone())?; match self.mode { Mode::Standalone => { - let exprs = prometheus::write_request_to_insert_exprs(database, request)?; - let futures = exprs - .into_iter() - .map(|e| self.handle_insert(e)) - .collect::>(); - let res = futures_util::future::join_all(futures) + self.handle_inserts(exprs) .await - .into_iter() - .collect::, crate::error::Error>>(); - res.map_err(BoxedError::new) - .context(error::ExecuteInsertSnafu { - msg: "failed to write prometheus remote request", + .map_err(BoxedError::new) + .with_context(|_| error::ExecuteInsertSnafu { + msg: format!("{:?}", request), })?; } Mode::Distributed => { - let inserts = prometheus::write_request_to_insert_exprs(database, request)?; - - self.dist_insert(inserts) + self.dist_insert(exprs) .await .map_err(BoxedError::new) - .context(error::ExecuteInsertSnafu { - msg: "execute insert failed", + .with_context(|_| error::ExecuteInsertSnafu { + msg: format!("{:?}", request), })?; } } - Ok(()) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 36d229a245..ac97d2dc3c 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -505,42 +505,33 @@ impl PartitionExec { } } -// FIXME(LFC): no allow, for clippy temporarily -#[allow(clippy::print_stdout)] #[cfg(test)] mod test { - use std::time::Duration; - use api::v1::column::SemanticType; use api::v1::{column, Column, ColumnDataType}; - use catalog::remote::MetaKvBackend; - use common_recordbatch::util; - use datafusion::arrow_print; - use datafusion_common::record_batch::RecordBatch as DfRecordBatch; + use common_query::physical_plan::DfPhysicalPlanAdapter; + use common_recordbatch::adapter::RecordBatchStreamAdapter; + use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion::physical_plan::expressions::{col as physical_col, PhysicalSortExpr}; + use datafusion::physical_plan::sorts::sort::SortExec; + use datafusion::physical_plan::ExecutionPlan; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::lit; - use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::instance::Instance; + use datatypes::arrow::compute::sort::SortOptions; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; - use meta_client::client::{MetaClient, MetaClientBuilder}; + use meta_client::client::MetaClient; use meta_client::rpc::router::RegionRoute; use meta_client::rpc::{Region, Table, TableRoute}; - use meta_srv::metasrv::MetaSrvOptions; - use meta_srv::mocks::MockInfo; - use meta_srv::service::store::kv::KvStoreRef; - use meta_srv::service::store::memory::MemStore; use sql::parser::ParserContext; use sql::statements::statement::Statement; use sqlparser::dialect::GenericDialect; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::TableRef; - use tempdir::TempDir; use super::*; - use crate::catalog::FrontendCatalogManager; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; - use crate::instance::distributed::DistInstance; use crate::partitioning::range::RangePartitionRule; #[tokio::test(flavor = "multi_thread")] @@ -741,29 +732,78 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_dist_table_scan() { - common_telemetry::init_default_ut_logging(); let table = Arc::new(new_dist_table().await); // should scan all regions - // select * from numbers - let projection = None; + // select a, row_id from numbers + let projection = Some(vec![1, 2]); let filters = vec![]; - exec_table_scan(table.clone(), projection, filters, None).await; - println!(); + let expected_output = vec![ + "+-----+--------+", + "| a | row_id |", + "+-----+--------+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "| 10 | 1 |", + "| 11 | 2 |", + "| 12 | 3 |", + "| 13 | 4 |", + "| 14 | 5 |", + "| 30 | 1 |", + "| 31 | 2 |", + "| 32 | 3 |", + "| 33 | 4 |", + "| 34 | 5 |", + "| 100 | 1 |", + "| 101 | 2 |", + "| 102 | 3 |", + "| 103 | 4 |", + "| 104 | 5 |", + "+-----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 4, expected_output).await; // should scan only region 1 // select a, row_id from numbers where a < 10 let projection = Some(vec![1, 2]); let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()]; - exec_table_scan(table.clone(), projection, filters, None).await; - println!(); + let expected_output = vec![ + "+---+--------+", + "| a | row_id |", + "+---+--------+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "+---+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 1, expected_output).await; // should scan region 1 and 2 // select a, row_id from numbers where a < 15 let projection = Some(vec![1, 2]); let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()]; - exec_table_scan(table.clone(), projection, filters, None).await; - println!(); + let expected_output = vec![ + "+----+--------+", + "| a | row_id |", + "+----+--------+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "| 10 | 1 |", + "| 11 | 2 |", + "| 12 | 3 |", + "| 13 | 4 |", + "| 14 | 5 |", + "+----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 2, expected_output).await; // should scan region 2 and 3 // select a, row_id from numbers where a < 40 and a >= 10 @@ -773,8 +813,23 @@ mod test { binary_expr(col("a"), Operator::GtEq, lit(10)), ) .into()]; - exec_table_scan(table.clone(), projection, filters, None).await; - println!(); + let expected_output = vec![ + "+----+--------+", + "| a | row_id |", + "+----+--------+", + "| 10 | 1 |", + "| 11 | 2 |", + "| 12 | 3 |", + "| 13 | 4 |", + "| 14 | 5 |", + "| 30 | 1 |", + "| 31 | 2 |", + "| 32 | 3 |", + "| 33 | 4 |", + "| 34 | 5 |", + "+----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 2, expected_output).await; // should scan all regions // select a, row_id from numbers where a < 1000 and row_id == 1 @@ -784,36 +839,59 @@ mod test { binary_expr(col("row_id"), Operator::Eq, lit(1)), ) .into()]; - exec_table_scan(table.clone(), projection, filters, None).await; + let expected_output = vec![ + "+-----+--------+", + "| a | row_id |", + "+-----+--------+", + "| 0 | 1 |", + "| 10 | 1 |", + "| 30 | 1 |", + "| 100 | 1 |", + "+-----+--------+", + ]; + exec_table_scan(table.clone(), projection, filters, 4, expected_output).await; } async fn exec_table_scan( table: TableRef, projection: Option>, filters: Vec, - limit: Option, + expected_partitions: usize, + expected_output: Vec<&str>, ) { let table_scan = table - .scan(&projection, filters.as_slice(), limit) + .scan(&projection, filters.as_slice(), None) .await .unwrap(); + assert_eq!( + table_scan.output_partitioning().partition_count(), + expected_partitions + ); - for partition in 0..table_scan.output_partitioning().partition_count() { - let result = table_scan - .execute(partition, Arc::new(RuntimeEnv::default())) - .unwrap(); - let recordbatches = util::collect(result).await.unwrap(); + let merge = + CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(table_scan.clone()))); - let df_recordbatch = recordbatches - .into_iter() - .map(|r| r.df_recordbatch) - .collect::>(); + let sort = SortExec::try_new( + vec![PhysicalSortExpr { + expr: physical_col("a", table_scan.schema().arrow_schema()).unwrap(), + options: SortOptions::default(), + }], + Arc::new(merge), + ) + .unwrap(); + assert_eq!(sort.output_partitioning().partition_count(), 1); - println!("DataFusion partition {}:", partition); - let pretty_print = arrow_print::write(&df_recordbatch); - let pretty_print = pretty_print.lines().collect::>(); - pretty_print.iter().for_each(|x| println!("{}", x)); - } + let stream = sort + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); + let stream = Box::pin(RecordBatchStreamAdapter::try_new(stream).unwrap()); + + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().lines().collect::>(), + expected_output + ); } async fn new_dist_table() -> DistTable { @@ -824,52 +902,13 @@ mod test { ]; let schema = Arc::new(Schema::new(column_schemas.clone())); - let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _; - let meta_srv = - meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await; - - let datanode_clients = Arc::new(DatanodeClients::new()); - - let mut datanode_instances = HashMap::new(); - for datanode_id in 1..=4 { - let dn_instance = create_datanode_instance(datanode_id, meta_srv.clone()).await; - datanode_instances.insert(datanode_id, dn_instance.clone()); - - let (addr, client) = crate::tests::create_datanode_client(dn_instance).await; - datanode_clients - .insert_client(Peer::new(datanode_id, addr), client) - .await; - } - - let MockInfo { - server_addr, - channel_manager, - } = meta_srv.clone(); - let mut meta_client = MetaClientBuilder::new(1000, 0) - .enable_router() - .enable_store() - .channel_manager(channel_manager) - .build(); - meta_client.start(&[&server_addr]).await.unwrap(); - let meta_client = Arc::new(meta_client); + let (dist_instance, datanode_instances) = crate::tests::create_dist_instance().await; + let catalog_manager = dist_instance.catalog_manager(); + let table_routes = catalog_manager.table_routes(); + let datanode_clients = catalog_manager.datanode_clients(); let table_name = TableName::new("greptime", "public", "dist_numbers"); - let meta_backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); - let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); - let catalog_manager = Arc::new(FrontendCatalogManager::new( - meta_backend, - table_routes.clone(), - datanode_clients.clone(), - )); - let dist_instance = DistInstance::new( - meta_client.clone(), - catalog_manager, - datanode_clients.clone(), - ); - let sql = " CREATE TABLE greptime.public.dist_numbers ( ts BIGINT, @@ -893,17 +932,16 @@ mod test { _ => unreachable!(), }; - wait_datanodes_alive(kv_store).await; - - let factory = DefaultCreateExprFactory {}; - let mut expr = factory.create_expr_by_stmt(&create_table).await.unwrap(); + let mut expr = DefaultCreateExprFactory + .create_expr_by_stmt(&create_table) + .await + .unwrap(); let _result = dist_instance .create_table(&mut expr, create_table.partitions) .await .unwrap(); let table_route = table_routes.get_route(&table_name).await.unwrap(); - println!("{}", serde_json::to_string_pretty(&table_route).unwrap()); let mut region_to_datanode_mapping = HashMap::new(); for region_route in table_route.region_routes.iter() { @@ -948,20 +986,6 @@ mod test { } } - async fn wait_datanodes_alive(kv_store: KvStoreRef) { - let wait = 10; - for _ in 0..wait { - let datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true) - .await - .unwrap(); - if datanodes.len() >= 4 { - return; - } - tokio::time::sleep(Duration::from_secs(1)).await - } - panic!() - } - async fn insert_testing_data( table_name: &TableName, dn_instance: Arc, @@ -1013,30 +1037,6 @@ mod test { .unwrap(); } - async fn create_datanode_instance(datanode_id: u64, meta_srv: MockInfo) -> Arc { - let current = common_time::util::current_time_millis(); - let wal_tmp_dir = - TempDir::new_in("/tmp", &format!("dist_table_test-wal-{}", current)).unwrap(); - let data_tmp_dir = - TempDir::new_in("/tmp", &format!("dist_table_test-data-{}", current)).unwrap(); - let opts = DatanodeOptions { - node_id: Some(datanode_id), - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), - storage: ObjectStoreConfig::File { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), - }, - ..Default::default() - }; - - let instance = Arc::new( - Instance::with_mock_meta_server(&opts, meta_srv) - .await - .unwrap(), - ); - instance.start().await.unwrap(); - instance - } - #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 7e59bb3908..4cb1360fea 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -12,17 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; +use catalog::remote::MetaKvBackend; use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::instance::Instance as DatanodeInstance; +use meta_client::client::MetaClientBuilder; +use meta_client::rpc::Peer; +use meta_srv::metasrv::MetaSrvOptions; +use meta_srv::mocks::MockInfo; +use meta_srv::service::store::kv::KvStoreRef; +use meta_srv::service::store::memory::MemStore; use servers::grpc::GrpcServer; +use tempdir::TempDir; use tonic::transport::Server; use tower::service_fn; +use crate::catalog::FrontendCatalogManager; +use crate::datanode::DatanodeClients; +use crate::instance::distributed::DistInstance; use crate::instance::Instance; +use crate::table::route::TableRoutes; async fn create_datanode_instance() -> Arc { // TODO(LFC) Use real Mito engine when we can alter its region schema, @@ -35,11 +50,10 @@ async fn create_datanode_instance() -> Arc { pub(crate) async fn create_frontend_instance() -> Arc { let datanode_instance: Arc = create_datanode_instance().await; let dn_catalog_manager = datanode_instance.catalog_manager().clone(); - let (_, client) = create_datanode_client(datanode_instance).await; - Arc::new(Instance::with_client_and_catalog_manager( - client, - dn_catalog_manager, - )) + + let mut frontend_instance = Instance::new_standalone(datanode_instance); + frontend_instance.set_catalog_manager(dn_catalog_manager); + Arc::new(frontend_instance) } pub(crate) async fn create_datanode_client( @@ -96,3 +110,91 @@ pub(crate) async fn create_datanode_client( Client::with_manager_and_urls(channel_manager, vec![addr]), ) } + +async fn create_dist_datanode_instance( + datanode_id: u64, + meta_srv: MockInfo, +) -> Arc { + let current = common_time::util::current_time_millis(); + let wal_tmp_dir = TempDir::new_in("/tmp", &format!("dist_datanode-wal-{}", current)).unwrap(); + let data_tmp_dir = TempDir::new_in("/tmp", &format!("dist_datanode-data-{}", current)).unwrap(); + let opts = DatanodeOptions { + node_id: Some(datanode_id), + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + storage: ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + ..Default::default() + }; + + let instance = Arc::new( + DatanodeInstance::with_mock_meta_server(&opts, meta_srv) + .await + .unwrap(), + ); + instance.start().await.unwrap(); + instance +} + +async fn wait_datanodes_alive(kv_store: KvStoreRef) { + let wait = 10; + for _ in 0..wait { + let datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true) + .await + .unwrap(); + if datanodes.len() >= 4 { + return; + } + tokio::time::sleep(Duration::from_secs(1)).await + } + panic!() +} + +pub(crate) async fn create_dist_instance() -> (DistInstance, HashMap>) { + let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _; + let meta_srv = meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await; + + let datanode_clients = Arc::new(DatanodeClients::new()); + + let mut datanode_instances = HashMap::new(); + for datanode_id in 1..=4 { + let dn_instance = create_dist_datanode_instance(datanode_id, meta_srv.clone()).await; + datanode_instances.insert(datanode_id, dn_instance.clone()); + + let (addr, client) = create_datanode_client(dn_instance).await; + datanode_clients + .insert_client(Peer::new(datanode_id, addr), client) + .await; + } + + let MockInfo { + server_addr, + channel_manager, + } = meta_srv.clone(); + let mut meta_client = MetaClientBuilder::new(1000, 0) + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client.start(&[&server_addr]).await.unwrap(); + let meta_client = Arc::new(meta_client); + + let meta_backend = Arc::new(MetaKvBackend { + client: meta_client.clone(), + }); + let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); + let catalog_manager = Arc::new(FrontendCatalogManager::new( + meta_backend, + table_routes.clone(), + datanode_clients.clone(), + )); + + wait_datanodes_alive(kv_store).await; + + let dist_instance = DistInstance::new( + meta_client.clone(), + catalog_manager, + datanode_clients.clone(), + ); + (dist_instance, datanode_instances) +} diff --git a/src/script/src/python/builtins/mod.rs b/src/script/src/python/builtins/mod.rs index 5bee6e5577..679d91289b 100644 --- a/src/script/src/python/builtins/mod.rs +++ b/src/script/src/python/builtins/mod.rs @@ -15,7 +15,6 @@ //! Builtin module contains GreptimeDB builtin udf/udaf #[cfg(test)] -#[allow(clippy::print_stdout)] mod test; use datafusion_common::{DataFusionError, ScalarValue}; diff --git a/src/script/src/python/builtins/test.rs b/src/script/src/python/builtins/test.rs index 8fdeb9ad94..39caf399e2 100644 --- a/src/script/src/python/builtins/test.rs +++ b/src/script/src/python/builtins/test.rs @@ -18,6 +18,7 @@ use std::io::Read; use std::path::Path; use std::sync::Arc; +use common_telemetry::{error, info}; use datatypes::arrow::array::{Float64Array, Int64Array, PrimitiveArray}; use datatypes::arrow::compute::cast::CastOptions; use datatypes::arrow::datatypes::DataType; @@ -331,6 +332,8 @@ impl PyValue { #[test] fn run_builtin_fn_testcases() { + common_telemetry::init_default_ut_logging(); + let loc = Path::new("src/python/builtins/testcases.ron"); let loc = loc.to_str().expect("Fail to parse path"); let mut file = File::open(loc).expect("Fail to open file"); @@ -343,7 +346,7 @@ fn run_builtin_fn_testcases() { PyVector::make_class(&vm.ctx); }); for (idx, case) in testcases.into_iter().enumerate() { - print!("Testcase {idx} ..."); + info!("Testcase {idx} ..."); cached_vm .enter(|vm| { let scope = vm.new_scope_with_builtins(); @@ -368,7 +371,7 @@ fn run_builtin_fn_testcases() { let err_res = format_py_error(e, vm).to_string(); match case.expect{ Ok(v) => { - println!("\nError:\n{err_res}"); + error!("\nError:\n{err_res}"); panic!("Expect Ok: {v:?}, found Error"); }, Err(err) => { @@ -397,7 +400,6 @@ fn run_builtin_fn_testcases() { } }; }); - println!(" passed!"); } } @@ -443,6 +445,8 @@ fn set_lst_of_vecs_in_scope( #[allow(unused_must_use)] #[test] fn test_vm() { + common_telemetry::init_default_ut_logging(); + rustpython_vm::Interpreter::with_init(Default::default(), |vm| { vm.add_native_module("udf_builtins", Box::new(greptime_builtin::make_module)); // this can be in `.enter()` closure, but for clearity, put it in the `with_init()` @@ -471,11 +475,10 @@ sin(values)"#, .map_err(|err| vm.new_syntax_error(&err)) .unwrap(); let res = vm.run_code_obj(code_obj, scope); - println!("{:#?}", res); match res { Err(e) => { let err_res = format_py_error(e, vm).to_string(); - println!("Error:\n{err_res}"); + error!("Error:\n{err_res}"); } Ok(obj) => { let _ser = PyValue::from_py_obj(&obj, vm); diff --git a/src/script/src/python/test.rs b/src/script/src/python/test.rs index 5790ce281c..4c0bcdcd25 100644 --- a/src/script/src/python/test.rs +++ b/src/script/src/python/test.rs @@ -20,6 +20,7 @@ use std::io::prelude::*; use std::path::Path; use std::sync::Arc; +use common_telemetry::{error, info}; use console::style; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::array::PrimitiveArray; @@ -82,6 +83,8 @@ fn create_sample_recordbatch() -> DfRecordBatch { /// and exec/parse (depending on the type of predicate) then decide if result is as expected #[test] fn run_ron_testcases() { + common_telemetry::init_default_ut_logging(); + let loc = Path::new("src/python/testcases.ron"); let loc = loc.to_str().expect("Fail to parse path"); let mut file = File::open(loc).expect("Fail to open file"); @@ -89,9 +92,9 @@ fn run_ron_testcases() { file.read_to_string(&mut buf) .expect("Fail to read to string"); let testcases: Vec = from_ron_string(&buf).expect("Fail to convert to testcases"); - println!("Read {} testcases from {}", testcases.len(), loc); + info!("Read {} testcases from {}", testcases.len(), loc); for testcase in testcases { - print!(".ron test {}", testcase.name); + info!(".ron test {}", testcase.name); match testcase.predicate { Predicate::ParseIsOk { result } => { let copr = parse_and_compile_copr(&testcase.code); @@ -101,21 +104,19 @@ fn run_ron_testcases() { } Predicate::ParseIsErr { reason } => { let copr = parse_and_compile_copr(&testcase.code); - if copr.is_ok() { - eprintln!("Expect to be err, found{copr:#?}"); - panic!() - } + assert!(copr.is_err(), "Expect to be err, actual {copr:#?}"); + let res = &copr.unwrap_err(); - println!( + error!( "{}", pretty_print_error_in_src(&testcase.code, res, 0, "") ); let (res, _) = get_error_reason_loc(res); - if !res.contains(&reason) { - eprintln!("{}", testcase.code); - eprintln!("Parse Error, expect \"{reason}\" in \"{res}\", but not found."); - panic!() - } + assert!( + res.contains(&reason), + "{} Parse Error, expect \"{reason}\" in \"{res}\", actual not found.", + testcase.code, + ); } Predicate::ExecIsOk { fields, columns } => { let rb = create_sample_recordbatch(); @@ -129,28 +130,25 @@ fn run_ron_testcases() { .iter() .zip(&res.schema.arrow_schema().fields) .map(|(anno, real)| { - if !(anno.datatype.clone().unwrap() == real.data_type - && anno.is_nullable == real.is_nullable) - { - eprintln!("fields expect to be {anno:#?}, found to be {real:#?}."); - panic!() - } + assert!( + anno.datatype.clone().unwrap() == real.data_type + && anno.is_nullable == real.is_nullable, + "Fields expected to be {anno:#?}, actual {real:#?}" + ); }) .count(); columns .iter() .zip(res.df_recordbatch.columns()) .map(|(anno, real)| { - if !(&anno.ty == real.data_type() && anno.len == real.len()) { - eprintln!( - "Unmatch type or length!Expect [{:#?}; {}], found [{:#?}; {}]", - anno.ty, - anno.len, - real.data_type(), - real.len() - ); - panic!() - } + assert!( + &anno.ty == real.data_type() && anno.len == real.len(), + "Type or length not match! Expect [{:#?}; {}], actual [{:#?}; {}]", + anno.ty, + anno.len, + real.data_type(), + real.len() + ); }) .count(); } @@ -159,28 +157,24 @@ fn run_ron_testcases() { } => { let rb = create_sample_recordbatch(); let res = coprocessor::exec_coprocessor(&testcase.code, &rb); + assert!(res.is_err(), "{:#?}\nExpect Err(...), actual Ok(...)", res); if let Err(res) = res { - println!( + error!( "{}", pretty_print_error_in_src(&testcase.code, &res, 1120, "") ); let (reason, _) = get_error_reason_loc(&res); - if !reason.contains(&part_reason) { - eprintln!( - "{}\nExecute error, expect \"{reason}\" in \"{res}\", but not found.", - testcase.code, - reason = style(reason).green(), - res = style(res).red() - ); - panic!() - } - } else { - eprintln!("{:#?}\nExpect Err(...), found Ok(...)", res); - panic!(); + assert!( + reason.contains(&part_reason), + "{}\nExecute error, expect \"{reason}\" in \"{res}\", actual not found.", + testcase.code, + reason = style(reason).green(), + res = style(res).red() + ) } } } - println!(" ... {}", style("ok✅").green()); + info!(" ... {}", style("ok✅").green()); } } @@ -275,7 +269,7 @@ def calc_rvs(open_time, close): 0, "copr.py", ); - println!("{res}"); + info!("{res}"); } else if let Ok(res) = ret { dbg!(&res); } else { @@ -319,7 +313,7 @@ def a(cpu, mem): 0, "copr.py", ); - println!("{res}"); + info!("{res}"); } else if let Ok(res) = ret { dbg!(&res); } else { diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 951ad2f953..448df3e62e 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -1039,6 +1039,7 @@ pub mod tests { use std::sync::Arc; + use common_telemetry::info; use datatypes::vectors::{Float32Vector, Int32Vector, NullVector}; use rustpython_vm::builtins::PyList; use rustpython_vm::class::PyClassImpl; @@ -1170,9 +1171,10 @@ pub mod tests { } #[test] - #[allow(clippy::print_stdout)] // for debug purpose, also this is already a test function so allow print_stdout shouldn't be a problem? fn test_execute_script() { + common_telemetry::init_default_ut_logging(); + fn is_eq( v: PyResult, i: T, @@ -1221,7 +1223,7 @@ pub mod tests { for (code, pred) in snippet { let result = execute_script(&interpreter, code, None, pred); - println!( + info!( "\u{001B}[35m{code}\u{001B}[0m: {:?}{}", result.clone().map(|v| v.0), result diff --git a/src/session/src/context.rs b/src/session/src/context.rs index aec55ac941..2a6f9bbe72 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -30,6 +30,10 @@ impl Default for QueryContext { } impl QueryContext { + pub fn arc() -> QueryContextRef { + Arc::new(QueryContext::new()) + } + pub fn new() -> Self { Self { current_schema: ArcSwapOption::new(None), diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 70a3355f3d..501879023b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -29,8 +29,6 @@ use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; -use frontend::frontend::FrontendOptions; -use frontend::grpc::GrpcOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use object_store::backend::s3; use object_store::test_util::TempFolder; @@ -215,8 +213,7 @@ pub async fn create_test_table( } async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { - let fe_opts = FrontendOptions::default(); - let mut frontend_instance = FeInstance::try_new(&fe_opts).await.unwrap(); + let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); frontend_instance.set_catalog_manager(datanode_instance.catalog_manager().clone()); frontend_instance.set_script_handler(datanode_instance); frontend_instance @@ -262,19 +259,13 @@ pub async fn setup_test_app_with_frontend( pub async fn setup_grpc_server( store_type: StorageType, name: &str, -) -> (String, TestGuard, Arc, Arc) { +) -> (String, TestGuard, Arc) { common_telemetry::init_default_ut_logging(); - let datanode_port = get_port(); - let frontend_port = get_port(); - - let (mut opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port); - opts.rpc_addr = datanode_grpc_addr.clone(); + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); - let datanode_grpc_addr = datanode_grpc_addr.clone(); let runtime = Arc::new( RuntimeBuilder::default() .worker_threads(2) @@ -283,26 +274,9 @@ pub async fn setup_grpc_server( .unwrap(), ); - let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port); - let fe_opts = FrontendOptions { - mode: Mode::Standalone, - datanode_rpc_addr: datanode_grpc_addr.clone(), - grpc_options: Some(GrpcOptions { - addr: fe_grpc_addr.clone(), - runtime_size: 8, - }), - ..Default::default() - }; + let fe_grpc_addr = format!("127.0.0.1:{}", get_port()); - let datanode_grpc_server = Arc::new(GrpcServer::new( - instance.clone(), - instance.clone(), - runtime.clone(), - )); - - let mut fe_instance = frontend::instance::Instance::try_new(&fe_opts) - .await - .unwrap(); + let mut fe_instance = frontend::instance::Instance::new_standalone(instance.clone()); fe_instance.set_catalog_manager(instance.catalog_manager().clone()); let fe_instance_ref = Arc::new(fe_instance); @@ -319,15 +293,8 @@ pub async fn setup_grpc_server( grpc_server_clone.start(addr).await.unwrap() }); - let dn_grpc_addr_clone = datanode_grpc_addr.clone(); - let dn_grpc_server_clone = datanode_grpc_server.clone(); - tokio::spawn(async move { - let addr = dn_grpc_addr_clone.parse::().unwrap(); - dn_grpc_server_clone.start(addr).await.unwrap() - }); - // wait for GRPC server to start tokio::time::sleep(Duration::from_secs(1)).await; - (fe_grpc_addr, guard, fe_grpc_server, datanode_grpc_server) + (fe_grpc_addr, guard, fe_grpc_server) } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index cf6ba4b922..6f94aff3e5 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -61,14 +61,13 @@ macro_rules! grpc_tests { } pub async fn test_auto_create_table(store_type: StorageType) { - let (addr, mut guard, fe_grpc_server, dn_grpc_server) = + let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "auto_create_table").await; let grpc_client = Client::with_urls(vec![addr]); let db = Database::new("greptime", grpc_client); insert_and_assert(&db).await; let _ = fe_grpc_server.shutdown().await; - let _ = dn_grpc_server.shutdown().await; guard.remove_all().await; } @@ -128,7 +127,7 @@ fn expect_data() -> (Column, Column, Column, Column) { pub async fn test_insert_and_select(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (addr, mut guard, fe_grpc_server, dn_grpc_server) = + let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "insert_and_select").await; let grpc_client = Client::with_urls(vec![addr]); @@ -173,7 +172,6 @@ pub async fn test_insert_and_select(store_type: StorageType) { insert_and_assert(&db).await; let _ = fe_grpc_server.shutdown().await; - let _ = dn_grpc_server.shutdown().await; guard.remove_all().await; }