diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4bb5be9bf2..1b38f98290 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -96,7 +96,7 @@ impl Instance { } }; - let query_engine = factory.query_engine().clone(); + let query_engine = factory.query_engine(); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index edef85601a..942f72c4f6 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -44,7 +44,7 @@ impl Instance { ); let factory = QueryEngineFactory::new(catalog_manager.clone()); - let query_engine = factory.query_engine().clone(); + let query_engine = factory.query_engine(); let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone()); let physical_planner = PhysicalPlanner::new(query_engine.clone()); @@ -100,7 +100,7 @@ impl Instance { )); let factory = QueryEngineFactory::new(catalog_manager.clone()); - let query_engine = factory.query_engine().clone(); + let query_engine = factory.query_engine(); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 0b2d7a11f8..b3cf4794db 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -214,7 +214,7 @@ mod tests { .unwrap(), ); let factory = QueryEngineFactory::new(catalog_list.clone()); - let query_engine = factory.query_engine().clone(); + let query_engine = factory.query_engine(); let sql_handler = SqlHandler::new(table_engine, catalog_list); let stmt = match query_engine.sql_to_statement(sql).unwrap() { diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index dfe9be93ff..f781423bf4 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -343,6 +343,13 @@ pub enum Error { #[snafu(backtrace)] source: client::Error, }, + + #[snafu(display("Failed to execute sql: {}, source: {}", sql, source))] + ExecuteSql { + sql: String, + #[snafu(backtrace)] + source: query::error::Error, + }, } pub type Result = std::result::Result; @@ -415,6 +422,8 @@ impl ErrorExt for Error { Error::Select { source, .. } => source.status_code(), Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), Error::DeserializeInsertBatch { source, .. } => source.status_code(), + + Error::ExecuteSql { source, .. } => source.status_code(), } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7fe470e07f..71c9fcf3da 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -145,13 +145,13 @@ impl Instance { }); let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); let datanode_clients = Arc::new(DatanodeClients::new()); - let catalog_manager = FrontendCatalogManager::new( + let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend, table_routes, datanode_clients.clone(), - ); + )); - instance.catalog_manager = Some(Arc::new(catalog_manager.clone())); + instance.catalog_manager = Some(catalog_manager.clone()); Some(DistInstance::new( meta_client, @@ -178,11 +178,15 @@ impl Instance { } pub async fn handle_select(&self, expr: Select) -> Result { - self.database() - .select(expr) - .await - .and_then(Output::try_from) - .context(SelectSnafu) + if let Some(dist_instance) = &self.dist_instance { + dist_instance.handle_select(expr).await + } else { + self.database() + .select(expr) + .await + .and_then(Output::try_from) + .context(SelectSnafu) + } } /// Convert `CreateTable` statement to `CreateExpr` gRPC request. diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index ebd1117564..cfd46c4184 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; +use client::Select; use common_catalog::{TableGlobalKey, TableGlobalValue}; use common_query::Output; use common_telemetry::debug; @@ -12,6 +13,7 @@ use meta_client::rpc::{ CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName, TableRoute, }; +use query::{QueryEngineFactory, QueryEngineRef}; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::create::CreateTable; use sql::statements::{ @@ -31,20 +33,23 @@ use crate::partitioning::{PartitionBound, PartitionDef}; #[derive(Clone)] pub(crate) struct DistInstance { meta_client: Arc, - catalog_manager: FrontendCatalogManager, + catalog_manager: Arc, datanode_clients: Arc, + query_engine: QueryEngineRef, } impl DistInstance { pub(crate) fn new( meta_client: Arc, - catalog_manager: FrontendCatalogManager, + catalog_manager: Arc, datanode_clients: Arc, ) -> Self { + let query_engine = QueryEngineFactory::new(catalog_manager.clone()).query_engine(); Self { meta_client, catalog_manager, datanode_clients, + query_engine, } } @@ -96,6 +101,18 @@ impl DistInstance { Ok(Output::AffectedRows(region_routes.len())) } + pub(crate) async fn handle_select(&self, select: Select) -> Result { + let Select::Sql(sql) = select; + let plan = self + .query_engine + .sql_to_plan(&sql) + .with_context(|_| error::ExecuteSqlSnafu { sql: sql.clone() })?; + self.query_engine + .execute(&plan) + .await + .context(error::ExecuteSqlSnafu { sql }) + } + async fn create_table_in_meta(&self, create_table: &CreateTable) -> Result { let (catalog, schema, table) = table_idents_to_full_name(&create_table.name).context(error::ParseSqlSnafu)?; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 1ce458aa28..34d2e040c9 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -434,6 +434,8 @@ impl PartitionExec { #[allow(clippy::print_stdout)] #[cfg(test)] mod test { + use std::time::Duration; + use api::v1::codec::InsertBatch; use api::v1::column::SemanticType; use api::v1::{column, insert_expr, Column, ColumnDataType}; @@ -652,8 +654,6 @@ mod test { } #[tokio::test(flavor = "multi_thread")] - // FIXME(LFC): Remove ignore when auto create table upon insertion is ready. - #[ignore] async fn test_dist_table_scan() { common_telemetry::init_default_ut_logging(); let table = Arc::new(new_dist_table().await); @@ -774,11 +774,11 @@ mod test { client: meta_client.clone(), }); let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); - let catalog_manager = FrontendCatalogManager::new( + let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend, table_routes.clone(), datanode_clients.clone(), - ); + )); let dist_instance = DistInstance::new( meta_client.clone(), catalog_manager, @@ -807,6 +807,9 @@ mod test { Statement::CreateTable(c) => c, _ => unreachable!(), }; + + wait_datanodes_alive(kv_store).await; + let _result = dist_instance.create_table(&create_table).await.unwrap(); let table_route = table_routes.get_route(&table_name).await.unwrap(); @@ -844,6 +847,20 @@ mod test { } } + async fn wait_datanodes_alive(kv_store: KvStoreRef) { + let wait = 10; + for _ in 0..wait { + let datanodes = meta_srv::lease::alive_datanodes(1000, &kv_store, |_, _| true) + .await + .unwrap(); + if datanodes.len() >= 4 { + return; + } + tokio::time::sleep(Duration::from_secs(1)).await + } + panic!() + } + async fn insert_testing_data( table_name: &TableName, dn_instance: Arc, diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index b2e256348d..79744f8fb5 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -3,7 +3,7 @@ pub mod bootstrap; pub mod error; pub mod handler; mod keys; -mod lease; +pub mod lease; pub mod metasrv; #[cfg(feature = "mock")] pub mod mocks; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index bba9ff86e1..246f6bc364 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -259,8 +259,7 @@ mod tests { .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) .unwrap(); - let factory = QueryEngineFactory::new(catalog_list); - factory.query_engine().clone() + QueryEngineFactory::new(catalog_list).query_engine() } #[test] diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 97e09959a4..afbfe91e3f 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -59,8 +59,8 @@ impl QueryEngineFactory { } impl QueryEngineFactory { - pub fn query_engine(&self) -> &Arc { - &self.query_engine + pub fn query_engine(&self) -> QueryEngineRef { + self.query_engine.clone() } } diff --git a/src/query/tests/function.rs b/src/query/tests/function.rs index 79d54bf5fc..895e7cf5d5 100644 --- a/src/query/tests/function.rs +++ b/src/query/tests/function.rs @@ -53,8 +53,7 @@ pub fn create_query_engine() -> Arc { .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); - let factory = QueryEngineFactory::new(catalog_list); - factory.query_engine().clone() + QueryEngineFactory::new(catalog_list).query_engine() } pub async fn get_numbers_from_table<'s, T>( diff --git a/src/query/tests/percentile_test.rs b/src/query/tests/percentile_test.rs index e809554b08..56e17d2575 100644 --- a/src/query/tests/percentile_test.rs +++ b/src/query/tests/percentile_test.rs @@ -143,6 +143,5 @@ fn create_correctness_engine() -> Arc { .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); - let factory = QueryEngineFactory::new(catalog_list); - factory.query_engine().clone() + QueryEngineFactory::new(catalog_list).query_engine() } diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index fde9f951c1..d0af4ab902 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -216,8 +216,7 @@ fn create_query_engine() -> Arc { .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); - let factory = QueryEngineFactory::new(catalog_list); - factory.query_engine().clone() + QueryEngineFactory::new(catalog_list).query_engine() } async fn get_numbers_from_table<'s, T>( diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 86baa4be2d..1b476f5fc5 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -124,7 +124,7 @@ mod tests { ); let factory = QueryEngineFactory::new(catalog_manager.clone()); - let query_engine = factory.query_engine().clone(); + let query_engine = factory.query_engine(); let mgr = ScriptManager::new(catalog_manager.clone(), query_engine) .await .unwrap(); diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 065f7dec7d..ea4d7ec61a 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -80,6 +80,6 @@ fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef { .unwrap(); let factory = QueryEngineFactory::new(catalog_list); - let query_engine = factory.query_engine().clone(); + let query_engine = factory.query_engine(); Arc::new(DummyInstance::new(query_engine)) }