diff --git a/Cargo.lock b/Cargo.lock index f2beb85e88..8220006c62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2918,6 +2918,8 @@ dependencies = [ "partition", "prost", "query", + "rstest", + "rstest_reuse", "rustls", "script", "serde", @@ -3103,6 +3105,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.27" @@ -6185,6 +6193,44 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rstest" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de1bb486a691878cd320c2f0d319ba91eeaa2e894066d8b5f8f117c000e9d962" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version 0.4.0", +] + +[[package]] +name = "rstest_macros" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290ca1a1c8ca7edb7c3283bd44dc35dd54fdec6253a3912e201ba1072018fca8" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2", + "quote", + "rustc_version 0.4.0", + "syn 1.0.109", + "unicode-ident", +] + +[[package]] +name = "rstest_reuse" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45f80dcc84beab3a327bbe161f77db25f336a1452428176787c8c79ac79d7073" +dependencies = [ + "quote", + "rand", + "rustc_version 0.4.0", + "syn 1.0.109", +] + [[package]] name = "rust-embed" version = "6.6.1" diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 1cfa061064..01553b29c4 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -22,9 +22,7 @@ use common_telemetry::timer; use query::error::QueryExecutionSnafu; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::StatementHandler; -use servers::error as server_error; -use servers::prom::PromHandler; -use session::context::{QueryContext, QueryContextRef}; +use session::context::QueryContextRef; use snafu::prelude::*; use sql::ast::ObjectName; use sql::statements::copy::{CopyTable, CopyTableArgument}; @@ -291,23 +289,6 @@ impl StatementHandler for Instance { } } -#[async_trait] -impl PromHandler for Instance { - async fn do_query(&self, query: &PromQuery) -> server_error::Result { - let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED); - - self.execute_promql(query, QueryContext::arc()) - .await - .map_err(BoxedError::new) - .with_context(|_| { - let query_literal = format!("{query:?}"); - server_error::ExecuteQuerySnafu { - query: query_literal, - } - }) - } -} - #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index d38ecb2b58..10c7ca8459 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -12,6 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(LFC): These tests should be moved to frontend crate. They are actually standalone instance tests. -mod promql_test; pub(crate) mod test_util; diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 220010ccf3..ed3ff40cba 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -16,19 +16,13 @@ use std::sync::Arc; use std::time::Duration; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; -use common_query::Output; -use common_recordbatch::util; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use mito::config::EngineConfig; use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; -use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use servers::Mode; -use session::context::QueryContext; use snafu::ResultExt; -use sql::statements::statement::Statement; -use sql::statements::tql::Tql; use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; @@ -80,40 +74,6 @@ impl MockInstance { } } - pub(crate) async fn execute_sql(&self, sql: &str) -> Output { - let engine = self.inner().query_engine(); - let planner = engine.planner(); - - let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); - match stmt { - QueryStatement::Sql(Statement::Query(_)) => { - let plan = planner.plan(stmt, QueryContext::arc()).await.unwrap(); - engine.execute(plan, QueryContext::arc()).await.unwrap() - } - QueryStatement::Sql(Statement::Tql(tql)) => { - let plan = match tql { - Tql::Eval(eval) => { - let promql = PromQuery { - start: eval.start, - end: eval.end, - step: eval.step, - query: eval.query, - }; - let stmt = QueryLanguageParser::parse_promql(&promql).unwrap(); - planner.plan(stmt, QueryContext::arc()).await.unwrap() - } - Tql::Explain(_) => unimplemented!(), - }; - engine.execute(plan, QueryContext::arc()).await.unwrap() - } - _ => self - .inner() - .execute_stmt(stmt, QueryContext::arc()) - .await - .unwrap(), - } - } - pub(crate) fn inner(&self) -> &Instance { &self.instance } @@ -207,29 +167,3 @@ pub async fn create_mock_sql_handler() -> SqlHandler { ); SqlHandler::new(mock_engine.clone(), catalog_manager, mock_engine, None) } - -pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance { - MockInstance::new(test_name).await -} - -pub async fn check_unordered_output_stream(output: Output, expected: String) { - let sort_table = |table: String| -> String { - let replaced = table.replace("\\n", "\n"); - let mut lines = replaced.split('\n').collect::>(); - lines.sort(); - lines - .into_iter() - .map(|s| s.to_string()) - .reduce(|acc, e| format!("{acc}\\n{e}")) - .unwrap() - }; - - let recordbatches = match output { - Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), - Output::RecordBatches(recordbatches) => recordbatches, - _ => unreachable!(), - }; - let pretty_print = sort_table(recordbatches.pretty_print().unwrap()); - let expected = sort_table(expected); - assert_eq!(pretty_print, expected); -} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a2ebdbbdca..b9a12b1027 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -59,6 +59,8 @@ common-test-util = { path = "../common/test-util" } datanode = { path = "../datanode" } futures = "0.3" meta-srv = { path = "../meta-srv", features = ["mock"] } +rstest = "0.17" +rstest_reuse = "0.5" strfmt = "0.2" toml = "0.5" tower = "0.4" diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index d1f2f28c99..19b4a87f7d 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -52,14 +52,15 @@ use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::StatementHandlerRef; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error as server_error; +use servers::error::{ExecuteQuerySnafu, ParsePromQLSnafu}; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; -use servers::prom::{PromHandler, PromHandlerRef}; +use servers::prom::PromHandler; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, }; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; use sql::dialect::GenericDialect; use sql::parser::ParserContext; @@ -108,7 +109,6 @@ pub struct Instance { statement_handler: StatementHandlerRef, query_engine: QueryEngineRef, grpc_query_handler: GrpcQueryHandlerRef, - promql_handler: Option, create_expr_factory: CreateExprFactoryRef, @@ -160,7 +160,6 @@ impl Instance { statement_handler: dist_instance.clone(), query_engine, grpc_query_handler: dist_instance, - promql_handler: None, plugins: plugins.clone(), servers: Arc::new(HashMap::new()), }) @@ -208,7 +207,6 @@ impl Instance { statement_handler: dn_instance.clone(), query_engine, grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), - promql_handler: Some(dn_instance.clone()), plugins: Default::default(), servers: Arc::new(HashMap::new()), }) @@ -243,7 +241,6 @@ impl Instance { query_engine, create_expr_factory: Arc::new(DefaultCreateExprFactory), grpc_query_handler: dist_instance, - promql_handler: None, plugins: Default::default(), servers: Arc::new(HashMap::new()), } @@ -438,10 +435,14 @@ fn parse_stmt(sql: &str) -> Result> { } impl Instance { - async fn plan_exec(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { + pub(crate) async fn plan_exec( + &self, + stmt: QueryStatement, + query_ctx: QueryContextRef, + ) -> Result { let planner = self.query_engine.planner(); let plan = planner - .plan(QueryStatement::Sql(stmt), query_ctx.clone()) + .plan(stmt, query_ctx.clone()) .await .context(PlanStatementSnafu)?; self.query_engine @@ -500,13 +501,13 @@ impl Instance { match stmt { Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { - self.plan_exec(stmt, query_ctx).await + self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await } // For performance consideration, only "insert with select" is executed by query engine. // Plain insert ("insert with values") is still executed directly in statement. Statement::Insert(ref insert) if insert.is_insert_select() => { - self.plan_exec(stmt, query_ctx).await + self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await } Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await, @@ -582,20 +583,13 @@ impl SqlQueryHandler for Instance { } async fn do_promql_query(&self, query: &PromQuery, _: QueryContextRef) -> Vec> { - if let Some(handler) = &self.promql_handler { - let result = handler.do_query(query).await.with_context(|_| { - let query_literal = format!("{query:?}"); - ExecutePromqlSnafu { - query: query_literal, - } - }); - vec![result] - } else { - vec![Err(NotSupportedSnafu { - feat: "PromQL Query", - } - .build())] - } + let result = + PromHandler::do_query(self, query) + .await + .with_context(|_| ExecutePromqlSnafu { + query: format!("{query:?}"), + }); + vec![result] } async fn do_describe( @@ -631,14 +625,15 @@ impl SqlQueryHandler for Instance { #[async_trait] impl PromHandler for Instance { async fn do_query(&self, query: &PromQuery) -> server_error::Result { - if let Some(promql_handler) = &self.promql_handler { - promql_handler.do_query(query).await - } else { - server_error::NotSupportedSnafu { - feat: "PromQL query in Frontend", - } - .fail() - } + let stmt = QueryLanguageParser::parse_promql(query).with_context(|_| ParsePromQLSnafu { + query: query.clone(), + })?; + self.plan_exec(stmt, QueryContext::arc()) + .await + .map_err(BoxedError::new) + .with_context(|_| ExecuteQuerySnafu { + query: format!("{query:?}"), + }) } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index a22c929958..db936c069e 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -34,3 +34,8 @@ mod server; mod table; #[cfg(test)] mod tests; + +#[cfg(test)] +// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate +#[allow(clippy::single_component_path_imports)] +use rstest_reuse; diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 80e93e4bf6..ccd2d9e3cd 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -13,6 +13,7 @@ // limitations under the License. mod instance_test; +mod promql_test; mod test_util; use std::collections::HashMap; diff --git a/src/frontend/src/tests/instance_test.rs b/src/frontend/src/tests/instance_test.rs index 6ce12e638e..e2bb4083e6 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/src/frontend/src/tests/instance_test.rs @@ -20,16 +20,21 @@ use common_query::Output; use common_recordbatch::util; use common_telemetry::logging; use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef}; +use rstest::rstest; +use rstest_reuse::apply; use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; use crate::error::Error; -use crate::tests::test_util::check_output_stream; -use crate::tests::{create_standalone_instance, MockStandaloneInstance}; +use crate::instance::Instance; +use crate::tests::test_util::{ + both_instances_cases, check_output_stream, check_unordered_output_stream, distributed, + standalone, standalone_instance_case, MockInstance, +}; -#[tokio::test(flavor = "multi_thread")] -async fn test_create_database_and_insert_query() { - let instance = create_standalone_instance("create_database_and_insert_query").await; +#[apply(both_instances_cases)] +async fn test_create_database_and_insert_query(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql(&instance, "create database test").await; assert!(matches!(output, Output::AffectedRows(1))); @@ -74,10 +79,9 @@ async fn test_create_database_and_insert_query() { } } -#[tokio::test(flavor = "multi_thread")] -async fn test_issue477_same_table_name_in_different_databases() { - let instance = - create_standalone_instance("test_issue477_same_table_name_in_different_databases").await; +#[apply(both_instances_cases)] +async fn test_issue477_same_table_name_in_different_databases(instance: Arc) { + let instance = instance.frontend(); // Create database a and b let output = execute_sql(&instance, "create database a").await; @@ -144,7 +148,7 @@ async fn test_issue477_same_table_name_in_different_databases() { .await; } -async fn assert_query_result(instance: &MockStandaloneInstance, sql: &str, ts: i64, host: &str) { +async fn assert_query_result(instance: &Arc, sql: &str, ts: i64, host: &str) { let query_output = execute_sql(instance, sql).await; match query_output { Output::Stream(s) => { @@ -164,9 +168,9 @@ async fn assert_query_result(instance: &MockStandaloneInstance, sql: &str, ts: i } } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_insert() { - let instance = create_standalone_instance("test_execute_insert").await; +#[apply(both_instances_cases)] +async fn test_execute_insert(instance: Arc) { + let instance = instance.frontend(); // create table execute_sql( @@ -186,9 +190,9 @@ async fn test_execute_insert() { assert!(matches!(output, Output::AffectedRows(2))); } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_insert_by_select() { - let instance = create_standalone_instance("test_execute_insert_by_select").await; +#[apply(both_instances_cases)] +async fn test_execute_insert_by_select(instance: Arc) { + let instance = instance.frontend(); // create table execute_sql( @@ -247,9 +251,9 @@ async fn test_execute_insert_by_select() { check_output_stream(output, expected).await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_insert_query_with_i64_timestamp() { - let instance = create_standalone_instance("insert_query_i64_timestamp").await; +#[apply(both_instances_cases)] +async fn test_execute_insert_query_with_i64_timestamp(instance: Arc) { + let instance = instance.frontend(); execute_sql( &instance, @@ -299,9 +303,9 @@ async fn test_execute_insert_query_with_i64_timestamp() { } } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_query() { - let instance = create_standalone_instance("execute_query").await; +#[apply(standalone_instance_case)] +async fn test_execute_query(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql(&instance, "select sum(number) from numbers limit 20").await; match output { @@ -319,9 +323,11 @@ async fn test_execute_query() { } } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_show_databases_tables() { - let instance = create_standalone_instance("execute_show_databases_tables").await; +#[apply(both_instances_cases)] +async fn test_execute_show_databases_tables(instance: Arc) { + let is_distributed_mode = instance.is_distributed_mode(); + + let instance = instance.frontend(); let output = execute_sql(&instance, "show databases").await; match output { @@ -353,15 +359,26 @@ async fn test_execute_show_databases_tables() { _ => unreachable!(), } + let expected = if is_distributed_mode { + "\ ++---------+ +| Tables | ++---------+ +| scripts | ++---------+\ +" + } else { + "\ ++---------+ +| Tables | ++---------+ +| numbers | +| scripts | ++---------+\ +" + }; let output = execute_sql(&instance, "show tables").await; - match output { - Output::RecordBatches(databases) => { - let databases = databases.take(); - assert_eq!(1, databases[0].num_columns()); - assert_eq!(databases[0].column(0).len(), 2); - } - _ => unreachable!(), - } + check_unordered_output_stream(output, expected).await; execute_sql( &instance, @@ -369,14 +386,27 @@ async fn test_execute_show_databases_tables() { ).await; let output = execute_sql(&instance, "show tables").await; - match output { - Output::RecordBatches(databases) => { - let databases = databases.take(); - assert_eq!(1, databases[0].num_columns()); - assert_eq!(databases[0].column(0).len(), 3); - } - _ => unreachable!(), - } + let expected = if is_distributed_mode { + "\ ++---------+ +| Tables | ++---------+ +| demo | +| scripts | ++---------+\ +" + } else { + "\ ++---------+ +| Tables | ++---------+ +| demo | +| numbers | +| scripts | ++---------+\ +" + }; + check_unordered_output_stream(output, expected).await; // show tables like [string] let output = execute_sql(&instance, "show tables like 'de%'").await; @@ -395,9 +425,9 @@ async fn test_execute_show_databases_tables() { } } -#[tokio::test(flavor = "multi_thread")] -pub async fn test_execute_create() { - let instance = create_standalone_instance("execute_create").await; +#[apply(both_instances_cases)] +async fn test_execute_create(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql( &instance, @@ -414,9 +444,9 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } -#[tokio::test] -async fn test_rename_table() { - let instance = create_standalone_instance("test_rename_table_local").await; +#[apply(standalone_instance_case)] +async fn test_rename_table(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql(&instance, "create database db").await; assert!(matches!(output, Output::AffectedRows(1))); @@ -470,9 +500,10 @@ async fn test_rename_table() { .expect_err("no table found in expect"); } -#[tokio::test] -async fn test_create_table_after_rename_table() { - let instance = create_standalone_instance("test_rename_table_local").await; +// should apply to both instances. tracked in #723 +#[apply(standalone_instance_case)] +async fn test_create_table_after_rename_table(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql(&instance, "create database db").await; assert!(matches!(output, Output::AffectedRows(1))); @@ -520,9 +551,9 @@ async fn test_create_table_after_rename_table() { check_output_stream(output, expect).await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_alter_table() { - let instance = create_standalone_instance("test_alter_table").await; +#[apply(both_instances_cases)] +async fn test_alter_table(instance: Arc) { + let instance = instance.frontend(); // create table execute_sql( @@ -608,11 +639,10 @@ async fn test_alter_table() { check_output_stream(output, expected).await; } -async fn test_insert_with_default_value_for_type(type_name: &str) { - let instance = create_standalone_instance("execute_create").await; - +async fn test_insert_with_default_value_for_type(instance: Arc, type_name: &str) { + let table_name = format!("test_table_with_{type_name}"); let create_sql = format!( - r#"create table test_table( + r#"create table {table_name}( host string, ts {type_name} DEFAULT CURRENT_TIMESTAMP, cpu double default 0, @@ -626,7 +656,7 @@ async fn test_insert_with_default_value_for_type(type_name: &str) { // Insert with ts. let output = execute_sql( &instance, - "insert into test_table(host, cpu, ts) values ('host1', 1.1, 1000)", + &format!("insert into {table_name}(host, cpu, ts) values ('host1', 1.1, 1000)"), ) .await; assert!(matches!(output, Output::AffectedRows(1))); @@ -634,12 +664,12 @@ async fn test_insert_with_default_value_for_type(type_name: &str) { // Insert without ts, so it should be filled by default value. let output = execute_sql( &instance, - "insert into test_table(host, cpu) values ('host2', 2.2)", + &format!("insert into {table_name}(host, cpu) values ('host2', 2.2)"), ) .await; assert!(matches!(output, Output::AffectedRows(1))); - let output = execute_sql(&instance, "select host, cpu from test_table").await; + let output = execute_sql(&instance, &format!("select host, cpu from {table_name}")).await; let expected = "\ +-------+-----+ | host | cpu | @@ -652,15 +682,17 @@ async fn test_insert_with_default_value_for_type(type_name: &str) { check_output_stream(output, expected).await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_insert_with_default_value() { - test_insert_with_default_value_for_type("timestamp").await; - test_insert_with_default_value_for_type("bigint").await; +// should apply to both instances. tracked in #1293 +#[apply(standalone_instance_case)] +async fn test_insert_with_default_value(instance: Arc) { + test_insert_with_default_value_for_type(instance.frontend(), "timestamp").await; + test_insert_with_default_value_for_type(instance.frontend(), "bigint").await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_use_database() { - let instance = create_standalone_instance("test_use_database").await; +// should apply to both instance. tracked in #1294 +#[apply(standalone_instance_case)] +async fn test_use_database(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql(&instance, "create database db1").await; assert!(matches!(output, Output::AffectedRows(1))); @@ -717,9 +749,10 @@ async fn test_use_database() { check_output_stream(output, expected).await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_delete() { - let instance = create_standalone_instance("test_delete").await; +// should apply to both instances. tracked in #755 +#[apply(standalone_instance_case)] +async fn test_delete(instance: Arc) { + let instance = instance.frontend(); let output = execute_sql( &instance, @@ -766,12 +799,11 @@ async fn test_delete() { check_output_stream(output, expect).await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_copy_to_s3() { - logging::init_default_ut_logging(); +#[apply(standalone_instance_case)] +async fn test_execute_copy_to_s3(instance: Arc) { if let Ok(bucket) = env::var("GT_S3_BUCKET") { if !bucket.is_empty() { - let instance = create_standalone_instance("test_execute_copy_to_s3").await; + let instance = instance.frontend(); // setups execute_sql( @@ -805,12 +837,12 @@ async fn test_execute_copy_to_s3() { } } -#[tokio::test(flavor = "multi_thread")] -async fn test_execute_copy_from_s3() { +#[apply(standalone_instance_case)] +async fn test_execute_copy_from_s3(instance: Arc) { logging::init_default_ut_logging(); if let Ok(bucket) = env::var("GT_S3_BUCKET") { if !bucket.is_empty() { - let instance = create_standalone_instance("test_execute_copy_from_s3").await; + let instance = instance.frontend(); // setups execute_sql( @@ -905,26 +937,26 @@ async fn test_execute_copy_from_s3() { } } -async fn execute_sql(instance: &MockStandaloneInstance, sql: &str) -> Output { +async fn execute_sql(instance: &Arc, sql: &str) -> Output { execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await } async fn try_execute_sql( - instance: &MockStandaloneInstance, + instance: &Arc, sql: &str, ) -> Result { try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await } async fn try_execute_sql_in_db( - instance: &MockStandaloneInstance, + instance: &Arc, sql: &str, db: &str, ) -> Result { let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db)); - instance.instance.do_query(sql, query_ctx).await.remove(0) + instance.do_query(sql, query_ctx).await.remove(0) } -async fn execute_sql_in_db(instance: &MockStandaloneInstance, sql: &str, db: &str) -> Output { +async fn execute_sql_in_db(instance: &Arc, sql: &str, db: &str) -> Output { try_execute_sql_in_db(instance, sql, db).await.unwrap() } diff --git a/src/datanode/src/tests/promql_test.rs b/src/frontend/src/tests/promql_test.rs similarity index 80% rename from src/datanode/src/tests/promql_test.rs rename to src/frontend/src/tests/promql_test.rs index a14bb16ea2..ab359de37a 100644 --- a/src/datanode/src/tests/promql_test.rs +++ b/src/frontend/src/tests/promql_test.rs @@ -12,15 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; +use rstest::rstest; +use rstest_reuse::apply; +use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; -use super::test_util::check_unordered_output_stream; -use crate::tests::test_util::setup_test_instance; +use super::test_util::{check_unordered_output_stream, standalone, standalone_instance_case}; +use crate::instance::Instance; +use crate::tests::test_util::MockInstance; #[allow(clippy::too_many_arguments)] async fn create_insert_query_assert( + instance: Arc, create: &str, insert: &str, promql: &str, @@ -30,37 +37,54 @@ async fn create_insert_query_assert( lookback: Duration, expected: &str, ) { - let instance = setup_test_instance("test_execute_insert").await; + instance.do_query(create, QueryContext::arc()).await; + instance.do_query(insert, QueryContext::arc()).await; - instance.execute_sql(create).await; - - instance.execute_sql(insert).await; + let query = PromQuery { + query: promql.to_string(), + start: "0".to_string(), + end: "0".to_string(), + step: "5m".to_string(), + }; + let QueryStatement::Promql(mut eval_stmt) = QueryLanguageParser::parse_promql(&query).unwrap() else { unreachable!() }; + eval_stmt.start = start; + eval_stmt.end = end; + eval_stmt.interval = interval; + eval_stmt.lookback_delta = lookback; let query_output = instance - .inner() - .execute_promql_statement(promql, start, end, interval, lookback, QueryContext::arc()) + .plan_exec(QueryStatement::Promql(eval_stmt), QueryContext::arc()) .await .unwrap(); - let expected = String::from(expected); check_unordered_output_stream(query_output, expected).await; } #[allow(clippy::too_many_arguments)] -async fn create_insert_tql_assert(create: &str, insert: &str, tql: &str, expected: &str) { - let instance = setup_test_instance("test_execute_insert").await; +async fn create_insert_tql_assert( + instance: Arc, + create: &str, + insert: &str, + tql: &str, + expected: &str, +) { + instance.do_query(create, QueryContext::arc()).await; + instance.do_query(insert, QueryContext::arc()).await; - instance.execute_sql(create).await; - - instance.execute_sql(insert).await; - - let query_output = instance.execute_sql(tql).await; - let expected = String::from(expected); + let query_output = instance + .do_query(tql, QueryContext::arc()) + .await + .remove(0) + .unwrap(); check_unordered_output_stream(query_output, expected).await; } -#[tokio::test(flavor = "multi_thread")] -async fn sql_insert_tql_query_ceil() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn sql_insert_tql_query_ceil(instance: Arc) { + let instance = instance.frontend(); + create_insert_tql_assert( + instance, r#"create table http_requests_total ( host string, cpu double, @@ -102,9 +126,13 @@ async fn sql_insert_tql_query_ceil() { .await; } -#[tokio::test(flavor = "multi_thread")] -async fn sql_insert_promql_query_ceil() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn sql_insert_promql_query_ceil(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, r#"create table http_requests_total ( host string, cpu double, @@ -181,9 +209,13 @@ fn unix_epoch_plus_100s() -> SystemTime { // eval instant at 50m SUM BY (group) (http_requests{job="api-server"}) // {group="canary"} 700 // {group="production"} 300 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_simple_sum() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_simple_sum(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "SUM BY (group) (http_requests{job=\"api-server\"})", @@ -205,9 +237,13 @@ async fn aggregators_simple_sum() { // eval instant at 50m avg by (group) (http_requests{job="api-server"}) // {group="canary"} 350 // {group="production"} 150 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_simple_avg() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_simple_avg(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "AVG BY (group) (http_requests{job=\"api-server\"})", @@ -229,9 +265,13 @@ async fn aggregators_simple_avg() { // eval instant at 50m count by (group) (http_requests{job="api-server"}) // {group="canary"} 2 // {group="production"} 2 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_simple_count() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_simple_count(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "COUNT BY (group) (http_requests{job=\"api-server\"})", @@ -253,9 +293,13 @@ async fn aggregators_simple_count() { // eval instant at 50m sum without (instance) (http_requests{job="api-server"}) // {group="canary",job="api-server"} 700 // {group="production",job="api-server"} 300 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_simple_without() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_simple_without(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "sum without (instance) (http_requests{job=\"api-server\"})", @@ -276,9 +320,13 @@ async fn aggregators_simple_without() { // # Empty by. // eval instant at 50m sum by () (http_requests{job="api-server"}) // {} 1000 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_empty_by() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_empty_by(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "sum by () (http_requests{job=\"api-server\"})", @@ -298,9 +346,13 @@ async fn aggregators_empty_by() { // # No by/without. // eval instant at 50m sum(http_requests{job="api-server"}) // {} 1000 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_no_by_without() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_no_by_without(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, r#"sum (http_requests{job="api-server"})"#, @@ -321,9 +373,13 @@ async fn aggregators_no_by_without() { // eval instant at 50m sum without () (http_requests{job="api-server",group="production"}) // {group="production",job="api-server",instance="0"} 100 // {group="production",job="api-server",instance="1"} 200 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_empty_without() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_empty_without(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, r#"sum without () (http_requests{job="api-server",group="production"})"#, @@ -345,9 +401,13 @@ async fn aggregators_empty_without() { // eval instant at 50m sum(http_requests) by (job) + min(http_requests) by (job) + max(http_requests) by (job) + avg(http_requests) by (job) // {job="app-server"} 4550 // {job="api-server"} 1750 -#[tokio::test(flavor = "multi_thread")] -async fn aggregators_complex_combined_aggrs() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn aggregators_complex_combined_aggrs(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "sum(http_requests) by (job) + min(http_requests) by (job) + max(http_requests) by (job) + avg(http_requests) by (job)", @@ -366,9 +426,13 @@ async fn aggregators_complex_combined_aggrs() { } // This is not from prometheus test set. It's derived from `aggregators_complex_combined_aggrs()` -#[tokio::test(flavor = "multi_thread")] -async fn two_aggregators_combined_aggrs() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn two_aggregators_combined_aggrs(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, "sum(http_requests) by (job) + min(http_requests) by (job) ", @@ -389,10 +453,14 @@ async fn two_aggregators_combined_aggrs() { // eval instant at 50m stddev by (instance)(http_requests) // {instance="0"} 223.60679774998 // {instance="1"} 223.60679774998 -#[tokio::test(flavor = "multi_thread")] +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] #[ignore = "TODO(ruihang): fix this case"] -async fn stddev_by_label() { +async fn stddev_by_label(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, r#"stddev by (instance)(http_requests)"#, @@ -411,9 +479,13 @@ async fn stddev_by_label() { } // This is not derived from prometheus -#[tokio::test(flavor = "multi_thread")] -async fn binary_op_plain_columns() { +// should apply to both instances. tracked in #1296 +#[apply(standalone_instance_case)] +async fn binary_op_plain_columns(instance: Arc) { + let instance = instance.frontend(); + create_insert_query_assert( + instance, AGGREGATORS_CREATE_TABLE, AGGREGATORS_INSERT_DATA, r#"http_requests - http_requests"#, diff --git a/src/frontend/src/tests/test_util.rs b/src/frontend/src/tests/test_util.rs index 1ac010e705..534e09709e 100644 --- a/src/frontend/src/tests/test_util.rs +++ b/src/frontend/src/tests/test_util.rs @@ -12,8 +12,80 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_query::Output; use common_recordbatch::util; +use rstest_reuse::{self, template}; + +use crate::instance::Instance; +use crate::tests::{ + create_distributed_instance, create_standalone_instance, MockDistributedInstance, + MockStandaloneInstance, +}; + +pub(crate) trait MockInstance { + fn frontend(&self) -> Arc; + + fn is_distributed_mode(&self) -> bool; +} + +impl MockInstance for MockStandaloneInstance { + fn frontend(&self) -> Arc { + self.instance.clone() + } + + fn is_distributed_mode(&self) -> bool { + false + } +} + +impl MockInstance for MockDistributedInstance { + fn frontend(&self) -> Arc { + self.frontend.clone() + } + + fn is_distributed_mode(&self) -> bool { + true + } +} + +pub(crate) async fn standalone() -> Arc { + let test_name = uuid::Uuid::new_v4().to_string(); + let instance = create_standalone_instance(&test_name).await; + Arc::new(instance) +} + +pub(crate) async fn distributed() -> Arc { + let test_name = uuid::Uuid::new_v4().to_string(); + let instance = create_distributed_instance(&test_name).await; + Arc::new(instance) +} + +#[template] +#[rstest] +#[case::test_with_standalone(standalone())] +#[case::test_with_distributed(distributed())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn both_instances_cases( + #[future] + #[case] + instance: Arc, +) { +} + +#[template] +#[rstest] +#[case::test_with_standalone(standalone())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn standalone_instance_case( + #[future] + #[case] + instance: Arc, +) { +} pub(crate) async fn check_output_stream(output: Output, expected: String) { let recordbatches = match output { @@ -24,3 +96,25 @@ pub(crate) async fn check_output_stream(output: Output, expected: String) { let pretty_print = recordbatches.pretty_print().unwrap(); assert_eq!(pretty_print, expected, "{}", pretty_print); } + +pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str) { + let sort_table = |table: &str| -> String { + let replaced = table.replace("\\n", "\n"); + let mut lines = replaced.split('\n').collect::>(); + lines.sort(); + lines + .into_iter() + .map(|s| s.to_string()) + .reduce(|acc, e| format!("{acc}\\n{e}")) + .unwrap() + }; + + let recordbatches = match output { + Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), + Output::RecordBatches(recordbatches) => recordbatches, + _ => unreachable!(), + }; + let pretty_print = sort_table(&recordbatches.pretty_print().unwrap()); + let expected = sort_table(expected); + assert_eq!(pretty_print, expected); +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 910303db81..b9ab4b5ab2 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -22,6 +22,7 @@ use axum::{http, Json}; use base64::DecodeError; use catalog; use common_error::prelude::*; +use query::parser::PromQuery; use serde_json::json; use tonic::codegen::http::{HeaderMap, HeaderValue}; use tonic::metadata::MetadataMap; @@ -281,6 +282,13 @@ pub enum Error { source: http::Error, backtrace: Backtrace, }, + + #[snafu(display("Failed to parse PromQL: {query:?}, source: {source}"))] + ParsePromQL { + query: PromQuery, + #[snafu(backtrace)] + source: query::error::Error, + }, } pub type Result = std::result::Result; @@ -346,6 +354,8 @@ impl ErrorExt for Error { #[cfg(feature = "mem-prof")] DumpProfileData { source, .. } => source.status_code(), InvalidFlushArgument { .. } => StatusCode::InvalidArguments, + + ParsePromQL { source, .. } => source.status_code(), } }