From 48c2841e4ddb57ce521970f70e4e71f97b4a5cdf Mon Sep 17 00:00:00 2001 From: LFC Date: Sun, 2 Apr 2023 20:36:48 +0800 Subject: [PATCH] feat: execute python script in distributed mode (#1264) * feat: execute python script in distributed mode * fix: rebase develop --- Cargo.lock | 2 +- src/catalog/src/error.rs | 9 +- src/cmd/src/standalone.rs | 5 +- src/datanode/Cargo.toml | 5 - src/datanode/src/error.rs | 7 - src/datanode/src/instance.rs | 6 - src/datanode/src/lib.rs | 1 - src/datanode/src/metrics.rs | 2 - src/frontend/Cargo.toml | 5 + src/frontend/src/catalog.rs | 175 +++++++++++++++++- src/frontend/src/error.rs | 8 + src/frontend/src/expr_factory.rs | 5 + src/frontend/src/instance.rs | 105 ++++------- src/frontend/src/instance/distributed.rs | 21 +-- src/frontend/src/instance/distributed/grpc.rs | 3 +- src/frontend/src/instance/grpc.rs | 9 +- .../src/instance/script.rs | 6 +- src/frontend/src/lib.rs | 3 + src/frontend/src/metric.rs | 17 ++ src/{datanode => frontend}/src/script.rs | 0 src/frontend/src/tests.rs | 27 +-- src/script/src/lib.rs | 2 +- src/script/src/table.rs | 2 +- tests-integration/src/test_util.rs | 33 ++-- tests-integration/tests/http.rs | 2 +- tests/cases/distributed/catalog/schema.result | 18 +- tests/runner/src/env.rs | 28 +-- 27 files changed, 343 insertions(+), 163 deletions(-) rename src/{datanode => frontend}/src/instance/script.rs (89%) create mode 100644 src/frontend/src/metric.rs rename src/{datanode => frontend}/src/script.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index af62adce23..f2beb85e88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2413,7 +2413,6 @@ dependencies = [ "prost", "query", "regex", - "script", "serde", "serde_json", "servers", @@ -2920,6 +2919,7 @@ dependencies = [ "prost", "query", "rustls", + "script", "serde", "serde_json", "servers", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 137286207d..322daeb621 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -219,6 +219,12 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("Invalid system table definition: {err_msg}"))] + InvalidSystemTableDef { + err_msg: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -231,7 +237,8 @@ impl ErrorExt for Error { | Error::TableNotFound { .. } | Error::IllegalManagerState { .. } | Error::CatalogNotFound { .. } - | Error::InvalidEntryType { .. } => StatusCode::Unexpected, + | Error::InvalidEntryType { .. } + | Error::InvalidSystemTableDef { .. } => StatusCode::Unexpected, Error::SystemCatalog { .. } | Error::EmptyValue { .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b5fcfa35e3..fa4a4e4912 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -236,8 +236,9 @@ async fn build_frontend( plugins: Arc, datanode_instance: InstanceRef, ) -> Result { - let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); - frontend_instance.set_script_handler(datanode_instance); + let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance.clone()) + .await + .context(StartFrontendSnafu)?; frontend_instance.set_plugins(plugins.clone()); Ok(frontend_instance) } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 9935a6c67b..840a21780b 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -4,10 +4,6 @@ version.workspace = true edition.workspace = true license.workspace = true -[features] -default = ["python"] -python = ["dep:script"] - [dependencies] async-compat = "0.2" async-stream.workspace = true @@ -49,7 +45,6 @@ pin-project = "1.0" prost.workspace = true query = { path = "../query" } regex = "1.6" -script = { path = "../script", features = ["python"], optional = true } serde = "1.0" serde_json = "1.0" servers = { path = "../servers" } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 1346dcf35d..2d05189b42 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -314,12 +314,6 @@ pub enum Error { source: sql::error::Error, }, - #[snafu(display("Failed to start script manager, source: {}", source))] - StartScriptManager { - #[snafu(backtrace)] - source: script::error::Error, - }, - #[snafu(display( "Failed to parse string to timestamp, string: {}, source: {}", raw, @@ -601,7 +595,6 @@ impl ErrorExt for Error { | WriteObject { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, OpenLogStore { source } => source.status_code(), - StartScriptManager { source } => source.status_code(), OpenStorageEngine { source } => source.status_code(), RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, MetaClientInit { source, .. } => source.status_code(), diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index c3860ddc4d..f5bb721162 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -58,11 +58,9 @@ use crate::error::{ NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, }; use crate::heartbeat::HeartbeatTask; -use crate::script::ScriptExecutor; use crate::sql::{SqlHandler, SqlRequest}; mod grpc; -mod script; pub mod sql; pub(crate) type DefaultEngine = MitoEngine>; @@ -72,7 +70,6 @@ pub struct Instance { pub(crate) query_engine: QueryEngineRef, pub(crate) sql_handler: SqlHandler, pub(crate) catalog_manager: CatalogManagerRef, - pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, procedure_manager: Option, @@ -170,8 +167,6 @@ impl Instance { let factory = QueryEngineFactory::new(catalog_manager.clone()); let query_engine = factory.query_engine(); - let script_executor = - ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; let heartbeat_task = match opts.mode { Mode::Standalone => None, @@ -205,7 +200,6 @@ impl Instance { procedure_manager.clone(), ), catalog_manager, - script_executor, heartbeat_task, table_id_provider, procedure_manager, diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 862a8c8478..6f1f143a4a 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -21,7 +21,6 @@ mod heartbeat; pub mod instance; pub mod metrics; mod mock; -mod script; pub mod server; pub mod sql; #[cfg(test)] diff --git a/src/datanode/src/metrics.rs b/src/datanode/src/metrics.rs index 88f50b5ed4..75da00c8eb 100644 --- a/src/datanode/src/metrics.rs +++ b/src/datanode/src/metrics.rs @@ -15,6 +15,4 @@ //! datanode metrics pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed"; -pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed"; -pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed"; pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed"; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 498981bf5d..a2ebdbbdca 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -4,6 +4,10 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +default = ["python"] +python = ["dep:script"] + [dependencies] api = { path = "../api" } async-stream.workspace = true @@ -37,6 +41,7 @@ partition = { path = "../partition" } prost.workspace = true query = { path = "../query" } rustls = "0.20" +script = { path = "../script", features = ["python"], optional = true } serde = "1.0" serde_json = "1.0" servers = { path = "../servers" } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index f61b7412f2..11d0ae9f3c 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -16,8 +16,12 @@ use std::any::Any; use std::collections::HashSet; use std::sync::Arc; +use api::v1::CreateTableExpr; use async_trait::async_trait; -use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu, Result as CatalogResult}; +use catalog::error::{ + self as catalog_err, InternalSnafu, InvalidCatalogValueSnafu, InvalidSystemTableDefSnafu, + Result as CatalogResult, UnimplementedSnafu, +}; use catalog::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue, @@ -28,6 +32,7 @@ use catalog::{ RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef, }; +use common_error::prelude::BoxedError; use common_telemetry::error; use futures::StreamExt; use meta_client::rpc::TableName; @@ -36,6 +41,8 @@ use snafu::prelude::*; use table::TableRef; use crate::datanode::DatanodeClients; +use crate::expr_factory; +use crate::instance::distributed::DistInstance; use crate::table::DistTable; #[derive(Clone)] @@ -43,6 +50,12 @@ pub struct FrontendCatalogManager { backend: KvBackendRef, partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, + + // TODO(LFC): Remove this field. + // DistInstance in FrontendCatalogManager is only used for creating distributed script table now. + // Once we have some standalone distributed table creator (like create distributed table procedure), + // we should use that. + dist_instance: Option>, } impl FrontendCatalogManager { @@ -55,9 +68,14 @@ impl FrontendCatalogManager { backend, partition_manager, datanode_clients, + dist_instance: None, } } + pub(crate) fn set_dist_instance(&mut self, dist_instance: Arc) { + self.dist_instance = Some(dist_instance) + } + pub(crate) fn backend(&self) -> KvBackendRef { self.backend.clone() } @@ -106,9 +124,93 @@ impl CatalogManager for FrontendCatalogManager { async fn register_system_table( &self, - _request: RegisterSystemTableRequest, + request: RegisterSystemTableRequest, ) -> catalog::error::Result<()> { - unimplemented!() + if let Some(dist_instance) = &self.dist_instance { + let open_hook = request.open_hook; + let request = request.create_table_request; + + if let Some(table) = self + .table( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await? + { + if let Some(hook) = open_hook { + (hook)(table)?; + } + return Ok(()); + } + + let time_index = request + .schema + .column_schemas + .iter() + .find_map(|x| { + if x.is_time_index() { + Some(x.name.clone()) + } else { + None + } + }) + .context(InvalidSystemTableDefSnafu { + err_msg: "Time index is not defined.", + })?; + + let primary_keys = request + .schema + .column_schemas + .iter() + .enumerate() + .filter_map(|(i, x)| { + if request.primary_key_indices.contains(&i) { + Some(x.name.clone()) + } else { + None + } + }) + .collect(); + + let column_defs = expr_factory::column_schemas_to_defs(request.schema.column_schemas) + .map_err(|e| { + InvalidSystemTableDefSnafu { + err_msg: e.to_string(), + } + .build() + })?; + + let mut create_table = CreateTableExpr { + catalog_name: request.catalog_name, + schema_name: request.schema_name, + table_name: request.table_name, + desc: request.desc.unwrap_or("".to_string()), + column_defs, + time_index, + primary_keys, + create_if_not_exists: request.create_if_not_exists, + table_options: (&request.table_options).into(), + table_id: None, // Should and will be assigned by Meta. + region_ids: vec![0], + }; + + let table = dist_instance + .create_table(&mut create_table, None) + .await + .map_err(BoxedError::new) + .context(InternalSnafu)?; + + if let Some(hook) = open_hook { + (hook)(table)?; + } + Ok(()) + } else { + UnimplementedSnafu { + operation: "register system table", + } + .fail() + } } fn schema( @@ -330,3 +432,70 @@ impl SchemaProvider for FrontendSchemaProvider { Ok(self.table_names()?.contains(&name.to_string())) } } + +#[cfg(test)] +mod tests { + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME}; + use table::requests::{CreateTableRequest, TableOptions}; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_register_system_table() { + let instance = + crate::tests::create_distributed_instance("test_register_system_table").await; + + let catalog_name = DEFAULT_CATALOG_NAME; + let schema_name = DEFAULT_SCHEMA_NAME; + let table_name = SCRIPTS_TABLE_NAME; + let request = CreateTableRequest { + id: 1, + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + desc: Some("Scripts table".to_string()), + schema: build_scripts_schema(), + region_numbers: vec![0], + primary_key_indices: vec![0, 1], + create_if_not_exists: true, + table_options: TableOptions::default(), + }; + + let result = instance + .catalog_manager + .register_system_table(RegisterSystemTableRequest { + create_table_request: request, + open_hook: None, + }) + .await; + assert!(result.is_ok()); + + assert!( + instance + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .unwrap() + .is_some(), + "the registered system table cannot be found in catalog" + ); + + let mut actually_created_table_in_datanode = 0; + for datanode in instance.datanodes.values() { + if datanode + .catalog_manager() + .table(catalog_name, schema_name, table_name) + .await + .unwrap() + .is_some() + { + actually_created_table_in_datanode += 1; + } + } + assert_eq!( + actually_created_table_in_datanode, 1, + "system table should be actually created at one and only one datanode" + ) + } +} diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 15eaccd965..2eec308447 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -378,6 +378,12 @@ pub enum Error { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display("Failed to start script manager, source: {}", source))] + StartScriptManager { + #[snafu(backtrace)] + source: script::error::Error, + }, } pub type Result = std::result::Result; @@ -462,6 +468,8 @@ impl ErrorExt for Error { source.status_code() } Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, + + Error::StartScriptManager { source } => source.status_code(), } } diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index fba82de8aa..e6628a9bac 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -187,7 +187,12 @@ fn columns_to_expr( .iter() .map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu)) .collect::>>()?; + column_schemas_to_defs(column_schemas) +} +pub(crate) fn column_schemas_to_defs( + column_schemas: Vec, +) -> Result> { let column_datatypes = column_schemas .iter() .map(|c| { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5dcfd0a5c7..d1f2f28c99 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -17,6 +17,7 @@ mod grpc; mod influxdb; mod opentsdb; mod prometheus; +mod script; mod standalone; use std::collections::HashMap; @@ -40,7 +41,6 @@ use common_telemetry::timer; use datafusion::sql::sqlparser::ast::ObjectName; use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::InstanceRef as DnInstanceRef; -use datanode::metrics; use datatypes::schema::Schema; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; @@ -58,7 +58,6 @@ use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, - ScriptHandlerRef, }; use session::context::QueryContextRef; use snafu::prelude::*; @@ -80,6 +79,8 @@ use crate::error::{ use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::StandaloneGrpcQueryHandler; +use crate::metric; +use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; #[async_trait] @@ -103,9 +104,7 @@ pub type FrontendInstanceRef = Arc; #[derive(Clone)] pub struct Instance { catalog_manager: CatalogManagerRef, - - /// Script handler is None in distributed mode, only works on standalone mode. - script_handler: Option, + script_executor: Arc, statement_handler: StatementHandlerRef, query_engine: QueryEngineRef, grpc_query_handler: GrpcQueryHandlerRef, @@ -134,23 +133,29 @@ impl Instance { let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); let datanode_clients = Arc::new(DatanodeClients::default()); - let catalog_manager = Arc::new(FrontendCatalogManager::new( - meta_backend, - partition_manager, - datanode_clients.clone(), - )); + let mut catalog_manager = + FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone()); - let dist_instance = - DistInstance::new(meta_client, catalog_manager.clone(), datanode_clients); + let dist_instance = DistInstance::new( + meta_client, + Arc::new(catalog_manager.clone()), + datanode_clients, + ); let dist_instance = Arc::new(dist_instance); + catalog_manager.set_dist_instance(dist_instance.clone()); + let catalog_manager = Arc::new(catalog_manager); + let query_engine = QueryEngineFactory::new_with_plugins(catalog_manager.clone(), plugins.clone()) .query_engine(); + let script_executor = + Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + Ok(Instance { catalog_manager, - script_handler: None, + script_executor, create_expr_factory: Arc::new(DefaultCreateExprFactory), statement_handler: dist_instance.clone(), query_engine, @@ -191,18 +196,22 @@ impl Instance { Ok(Arc::new(meta_client)) } - pub fn new_standalone(dn_instance: DnInstanceRef) -> Self { - Instance { - catalog_manager: dn_instance.catalog_manager().clone(), - script_handler: None, + pub async fn try_new_standalone(dn_instance: DnInstanceRef) -> Result { + let catalog_manager = dn_instance.catalog_manager(); + let query_engine = dn_instance.query_engine(); + let script_executor = + Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + Ok(Instance { + catalog_manager: catalog_manager.clone(), + script_executor, create_expr_factory: Arc::new(DefaultCreateExprFactory), statement_handler: dn_instance.clone(), - query_engine: dn_instance.query_engine(), + query_engine, grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), promql_handler: Some(dn_instance.clone()), plugins: Default::default(), servers: Arc::new(HashMap::new()), - } + }) } pub async fn build_servers( @@ -217,12 +226,19 @@ impl Instance { } #[cfg(test)] - pub(crate) fn new_distributed(dist_instance: Arc) -> Self { - let catalog_manager = dist_instance.catalog_manager(); + pub(crate) async fn new_distributed( + catalog_manager: CatalogManagerRef, + dist_instance: Arc, + ) -> Self { let query_engine = QueryEngineFactory::new(catalog_manager.clone()).query_engine(); + let script_executor = Arc::new( + ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) + .await + .unwrap(), + ); Instance { catalog_manager, - script_handler: None, + script_executor, statement_handler: dist_instance.clone(), query_engine, create_expr_factory: Arc::new(DefaultCreateExprFactory), @@ -237,14 +253,6 @@ impl Instance { &self.catalog_manager } - pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) { - debug_assert!( - self.script_handler.is_none(), - "Script handler can be set only once!" - ); - self.script_handler = Some(handler); - } - /// Handle batch inserts pub async fn handle_inserts( &self, @@ -532,7 +540,7 @@ impl SqlQueryHandler for Instance { type Error = Error; async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED); + let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); let query_interceptor = self.plugins.get::>(); let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { @@ -620,41 +628,6 @@ impl SqlQueryHandler for Instance { } } -#[async_trait] -impl ScriptHandler for Instance { - async fn insert_script( - &self, - schema: &str, - name: &str, - script: &str, - ) -> server_error::Result<()> { - if let Some(handler) = &self.script_handler { - handler.insert_script(schema, name, script).await - } else { - server_error::NotSupportedSnafu { - feat: "Script execution in Frontend", - } - .fail() - } - } - - async fn execute_script( - &self, - schema: &str, - script: &str, - params: HashMap, - ) -> server_error::Result { - if let Some(handler) = &self.script_handler { - handler.execute_script(schema, script, params).await - } else { - server_error::NotSupportedSnafu { - feat: "Script execution in Frontend", - } - .fail() - } - } -} - #[async_trait] impl PromHandler for Instance { async fn do_query(&self, query: &PromQuery) -> server_error::Result { diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index fef451de18..7553657b6b 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -56,6 +56,7 @@ use sql::statements::statement::Statement; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; use table::table::AlterContext; +use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; @@ -93,14 +94,14 @@ impl DistInstance { &self, create_table: &mut CreateTableExpr, partitions: Option, - ) -> Result { + ) -> Result { let table_name = TableName::new( &create_table.catalog_name, &create_table.schema_name, &create_table.table_name, ); - if self + if let Some(table) = self .catalog_manager .table( &table_name.catalog_name, @@ -109,10 +110,9 @@ impl DistInstance { ) .await .context(CatalogSnafu)? - .is_some() { return if create_table.create_if_not_exists { - Ok(Output::AffectedRows(0)) + Ok(table) } else { TableAlreadyExistSnafu { table: table_name.to_string(), @@ -153,20 +153,20 @@ impl DistInstance { create_table.table_id = Some(TableId { id: table_id }); - let table = DistTable::new( + let table = Arc::new(DistTable::new( table_name.clone(), table_info, self.catalog_manager.partition_manager(), self.catalog_manager.datanode_clients(), self.catalog_manager.backend(), - ); + )); let request = RegisterTableRequest { catalog: table_name.catalog_name.clone(), schema: table_name.schema_name.clone(), table_name: table_name.table_name.clone(), table_id, - table: Arc::new(table), + table: table.clone(), }; ensure!( self.catalog_manager @@ -196,9 +196,7 @@ impl DistInstance { .await .context(RequestDatanodeSnafu)?; } - - // Checked in real MySQL, it truly returns "0 rows affected". - Ok(Output::AffectedRows(0)) + Ok(table) } async fn drop_table(&self, table_name: TableName) -> Result { @@ -329,7 +327,8 @@ impl DistInstance { } Statement::CreateTable(stmt) => { let create_expr = &mut expr_factory::create_to_expr(&stmt, query_ctx)?; - Ok(self.create_table(create_expr, stmt.partitions).await?) + let _ = self.create_table(create_expr, stmt.partitions).await?; + Ok(Output::AffectedRows(0)) } Statement::Alter(alter_table) => { let expr = grpc::to_alter_expr(alter_table, query_ctx)?; diff --git a/src/frontend/src/instance/distributed/grpc.rs b/src/frontend/src/instance/distributed/grpc.rs index 0e41cc3f12..bfcb0233f4 100644 --- a/src/frontend/src/instance/distributed/grpc.rs +++ b/src/frontend/src/instance/distributed/grpc.rs @@ -49,7 +49,8 @@ impl GrpcQueryHandler for DistInstance { DdlExpr::CreateTable(mut expr) => { // TODO(LFC): Support creating distributed table through GRPC interface. // Currently only SQL supports it; how to design the fields in CreateTableExpr? - self.create_table(&mut expr, None).await + let _ = self.create_table(&mut expr, None).await; + Ok(Output::AffectedRows(0)) } DdlExpr::Alter(expr) => self.handle_alter_table(expr).await, DdlExpr::DropTable(expr) => { diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 6c65ecfc8f..20f3f706b7 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -383,8 +383,6 @@ CREATE TABLE {table_name} ( // Wait for previous task finished flush_table(frontend, "greptime", "public", table_name, None).await; - let table_id = 1024; - let table = instance .frontend .catalog_manager() @@ -394,7 +392,7 @@ CREATE TABLE {table_name} ( .unwrap(); let table = table.as_any().downcast_ref::().unwrap(); - let TableGlobalValue { regions_id_map, .. } = table + let tgv = table .table_global_value(&TableGlobalKey { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), @@ -403,7 +401,10 @@ CREATE TABLE {table_name} ( .await .unwrap() .unwrap(); - let region_to_dn_map = regions_id_map + let table_id = tgv.table_id(); + + let region_to_dn_map = tgv + .regions_id_map .iter() .map(|(k, v)| (v[0], *k)) .collect::>(); diff --git a/src/datanode/src/instance/script.rs b/src/frontend/src/instance/script.rs similarity index 89% rename from src/datanode/src/instance/script.rs rename to src/frontend/src/instance/script.rs index d3eb5cb29f..fc7757a365 100644 --- a/src/datanode/src/instance/script.rs +++ b/src/frontend/src/instance/script.rs @@ -20,7 +20,7 @@ use common_telemetry::timer; use servers::query_handler::ScriptHandler; use crate::instance::Instance; -use crate::metrics; +use crate::metric; #[async_trait] impl ScriptHandler for Instance { @@ -30,7 +30,7 @@ impl ScriptHandler for Instance { name: &str, script: &str, ) -> servers::error::Result<()> { - let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED); + let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED); self.script_executor .insert_script(schema, name, script) .await @@ -42,7 +42,7 @@ impl ScriptHandler for Instance { name: &str, params: HashMap, ) -> servers::error::Result { - let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED); + let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED); self.script_executor .execute_script(schema, name, params) .await diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 71cb0ecf8f..a22c929958 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(trait_upcasting)] pub mod catalog; pub mod datanode; @@ -22,11 +23,13 @@ pub mod frontend; pub mod grpc; pub mod influxdb; pub mod instance; +pub(crate) mod metric; pub mod mysql; pub mod opentsdb; pub mod postgres; pub mod prom; pub mod prometheus; +mod script; mod server; mod table; #[cfg(test)] diff --git a/src/frontend/src/metric.rs b/src/frontend/src/metric.rs new file mode 100644 index 0000000000..8c54526cfe --- /dev/null +++ b/src/frontend/src/metric.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) const METRIC_HANDLE_SQL_ELAPSED: &str = "frontend.handle_sql_elapsed"; +pub(crate) const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "frontend.handle_scripts_elapsed"; +pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"; diff --git a/src/datanode/src/script.rs b/src/frontend/src/script.rs similarity index 100% rename from src/datanode/src/script.rs rename to src/frontend/src/script.rs diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 9de5b8857c..80e93e4bf6 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -59,6 +59,7 @@ pub(crate) struct MockDistributedInstance { pub(crate) frontend: Arc, pub(crate) dist_instance: Arc, pub(crate) datanodes: HashMap>, + pub(crate) catalog_manager: Arc, _guards: Vec, } @@ -81,11 +82,11 @@ impl MockStandaloneInstance { pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { let (opts, guard) = create_tmp_dir_and_datanode_opts(test_name); - let datanode_instance = DatanodeInstance::new(&opts).await.unwrap(); - datanode_instance.start().await.unwrap(); - - let frontend_instance = Instance::new_standalone(Arc::new(datanode_instance)); - + let dn_instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap()); + let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) + .await + .unwrap(); + dn_instance.start().await.unwrap(); MockStandaloneInstance { instance: Arc::new(frontend_instance), _guard: guard, @@ -270,26 +271,28 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new( meta_client.clone(), )))); - let catalog_manager = Arc::new(FrontendCatalogManager::new( - meta_backend, - partition_manager, - datanode_clients.clone(), - )); + let mut catalog_manager = + FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone()); wait_datanodes_alive(kv_store).await; let dist_instance = DistInstance::new( meta_client.clone(), - catalog_manager, + Arc::new(catalog_manager.clone()), datanode_clients.clone(), ); let dist_instance = Arc::new(dist_instance); - let frontend = Instance::new_distributed(dist_instance.clone()); + + catalog_manager.set_dist_instance(dist_instance.clone()); + let catalog_manager = Arc::new(catalog_manager); + + let frontend = Instance::new_distributed(catalog_manager.clone(), dist_instance.clone()).await; MockDistributedInstance { frontend: Arc::new(frontend), dist_instance, datanodes: datanode_instances, + catalog_manager, _guards: test_guards, } } diff --git a/src/script/src/lib.rs b/src/script/src/lib.rs index 3dafa54697..d7257da4fd 100644 --- a/src/script/src/lib.rs +++ b/src/script/src/lib.rs @@ -18,4 +18,4 @@ pub mod error; pub mod manager; #[cfg(feature = "python")] pub mod python; -mod table; +pub mod table; diff --git a/src/script/src/table.rs b/src/script/src/table.rs index fcd9c72473..4a38fbf218 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -205,7 +205,7 @@ impl ScriptsTable { } /// Build scripts table -fn build_scripts_schema() -> RawSchema { +pub fn build_scripts_schema() -> RawSchema { let cols = vec![ ColumnSchema::new( "schema".to_string(), diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 69b0999b23..895895e9d9 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -27,7 +27,7 @@ use datanode::datanode::{ DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, WalConfig, }; use datanode::error::{CreateTableSnafu, Result}; -use datanode::instance::{Instance, InstanceRef}; +use datanode::instance::Instance; use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -255,16 +255,9 @@ pub async fn create_test_table( Ok(()) } -fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { - let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); - frontend_instance.set_script_handler(datanode_instance); - frontend_instance -} - pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - instance.start().await.unwrap(); create_test_table( instance.catalog_manager(), instance.sql_handler(), @@ -272,9 +265,13 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router ) .await .unwrap(); + let frontend_instance = FeInstance::try_new_standalone(instance.clone()) + .await + .unwrap(); + instance.start().await.unwrap(); let http_server = HttpServerBuilder::new(HttpOptions::default()) .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new( - build_frontend_instance(instance.clone()), + frontend_instance, ))) .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())) .with_metrics_handler(MetricsHandler) @@ -288,7 +285,9 @@ pub async fn setup_test_http_app_with_frontend( ) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - let frontend = build_frontend_instance(instance.clone()); + let frontend = FeInstance::try_new_standalone(instance.clone()) + .await + .unwrap(); instance.start().await.unwrap(); create_test_table( frontend.catalog_manager(), @@ -300,8 +299,8 @@ pub async fn setup_test_http_app_with_frontend( let frontend_ref = Arc::new(frontend); let http_server = HttpServerBuilder::new(HttpOptions::default()) .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref)) - .with_script_handler(instance.clone()) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) + .with_script_handler(frontend_ref) .build(); let app = http_server.make_app(); (app, guard) @@ -313,7 +312,9 @@ pub async fn setup_test_prom_app_with_frontend( ) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - let frontend = build_frontend_instance(instance.clone()); + let frontend = FeInstance::try_new_standalone(instance.clone()) + .await + .unwrap(); instance.start().await.unwrap(); create_test_table( frontend.catalog_manager(), @@ -335,7 +336,6 @@ pub async fn setup_grpc_server( let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - instance.start().await.unwrap(); let runtime = Arc::new( RuntimeBuilder::default() @@ -347,7 +347,10 @@ pub async fn setup_grpc_server( let fe_grpc_addr = format!("127.0.0.1:{}", get_port()); - let fe_instance = frontend::instance::Instance::new_standalone(instance.clone()); + let fe_instance = FeInstance::try_new_standalone(instance.clone()) + .await + .unwrap(); + instance.start().await.unwrap(); let fe_instance_ref = Arc::new(fe_instance); let fe_grpc_server = Arc::new(GrpcServer::new( ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a8c7b938e9..74e417b1f1 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -327,7 +327,7 @@ pub async fn test_metrics_api(store_type: StorageType) { let res = client.get("/metrics").send().await; assert_eq!(res.status(), StatusCode::OK); let body = res.text().await; - assert!(body.contains("datanode_handle_sql_elapsed")); + assert!(body.contains("frontend_handle_sql_elapsed")); guard.remove_all().await; } diff --git a/tests/cases/distributed/catalog/schema.result b/tests/cases/distributed/catalog/schema.result index 4bad27fc53..8e46924d3b 100644 --- a/tests/cases/distributed/catalog/schema.result +++ b/tests/cases/distributed/catalog/schema.result @@ -46,10 +46,11 @@ SHOW TABLES FROM test_public_schema; SHOW TABLES FROM public; -+--------+ -| Tables | -+--------+ -+--------+ ++---------+ +| Tables | ++---------+ +| scripts | ++---------+ INSERT INTO hello VALUES (2), (3), (4); @@ -90,10 +91,11 @@ SHOW TABLES FROM test_public_schema; SHOW TABLES FROM public; -+--------+ -| Tables | -+--------+ -+--------+ ++---------+ +| Tables | ++---------+ +| scripts | ++---------+ DROP SCHEMA test_public_schema; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index e3d96c2786..d3ac5b8ce2 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -105,19 +105,23 @@ impl Env { // start a distributed GreptimeDB let mut meta_server = Env::start_server("metasrv", &db_ctx, true); - // wait for election - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let mut frontend = Env::start_server("frontend", &db_ctx, true); - let mut datanode = Env::start_server("datanode", &db_ctx, true); + if !util::check_port(METASRV_ADDR.parse().unwrap(), Duration::from_secs(10)).await { + Env::stop_server(&mut meta_server).await; + panic!("Metasrv doesn't up in 10 seconds, quit.") + } - for addr in [DATANODE_ADDR, METASRV_ADDR, SERVER_ADDR].iter() { - let is_up = util::check_port(addr.parse().unwrap(), Duration::from_secs(10)).await; - if !is_up { - Env::stop_server(&mut meta_server).await; - Env::stop_server(&mut frontend).await; - Env::stop_server(&mut datanode).await; - panic!("Server {addr} doesn't up in 10 seconds, quit.") - } + let mut datanode = Env::start_server("datanode", &db_ctx, true); + // Wait for default catalog and schema being created. + // Can be removed once #1265 resolved, and merged with Frontend start checking below. + if !util::check_port(DATANODE_ADDR.parse().unwrap(), Duration::from_secs(10)).await { + Env::stop_server(&mut datanode).await; + panic!("Datanode doesn't up in 10 seconds, quit.") + } + + let mut frontend = Env::start_server("frontend", &db_ctx, true); + if !util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await { + Env::stop_server(&mut frontend).await; + panic!("Frontend doesn't up in 10 seconds, quit.") } let client = Client::with_urls(vec![SERVER_ADDR]);