refactor: move PromQL execution to Frontend (#1297)

* refactor: move PromQL execution to Frontend
This commit is contained in:
LFC
2023-04-03 11:34:03 +08:00
committed by GitHub
parent a82f1f564d
commit 215cea151f
12 changed files with 415 additions and 245 deletions

46
Cargo.lock generated
View File

@@ -2918,6 +2918,8 @@ dependencies = [
"partition",
"prost",
"query",
"rstest",
"rstest_reuse",
"rustls",
"script",
"serde",
@@ -3103,6 +3105,12 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.27"
@@ -6185,6 +6193,44 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rstest"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de1bb486a691878cd320c2f0d319ba91eeaa2e894066d8b5f8f117c000e9d962"
dependencies = [
"futures",
"futures-timer",
"rstest_macros",
"rustc_version 0.4.0",
]
[[package]]
name = "rstest_macros"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290ca1a1c8ca7edb7c3283bd44dc35dd54fdec6253a3912e201ba1072018fca8"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"rustc_version 0.4.0",
"syn 1.0.109",
"unicode-ident",
]
[[package]]
name = "rstest_reuse"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45f80dcc84beab3a327bbe161f77db25f336a1452428176787c8c79ac79d7073"
dependencies = [
"quote",
"rand",
"rustc_version 0.4.0",
"syn 1.0.109",
]
[[package]]
name = "rust-embed"
version = "6.6.1"

View File

@@ -22,9 +22,7 @@ use common_telemetry::timer;
use query::error::QueryExecutionSnafu;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::StatementHandler;
use servers::error as server_error;
use servers::prom::PromHandler;
use session::context::{QueryContext, QueryContextRef};
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::copy::{CopyTable, CopyTableArgument};
@@ -291,23 +289,6 @@ impl StatementHandler for Instance {
}
}
#[async_trait]
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED);
self.execute_promql(query, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| {
let query_literal = format!("{query:?}");
server_error::ExecuteQuerySnafu {
query: query_literal,
}
})
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;

View File

@@ -12,6 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(LFC): These tests should be moved to frontend crate. They are actually standalone instance tests.
mod promql_test;
pub(crate) mod test_util;

View File

@@ -16,19 +16,13 @@ use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_query::Output;
use common_recordbatch::util;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use servers::Mode;
use session::context::QueryContext;
use snafu::ResultExt;
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
@@ -80,40 +74,6 @@ impl MockInstance {
}
}
pub(crate) async fn execute_sql(&self, sql: &str) -> Output {
let engine = self.inner().query_engine();
let planner = engine.planner();
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
match stmt {
QueryStatement::Sql(Statement::Query(_)) => {
let plan = planner.plan(stmt, QueryContext::arc()).await.unwrap();
engine.execute(plan, QueryContext::arc()).await.unwrap()
}
QueryStatement::Sql(Statement::Tql(tql)) => {
let plan = match tql {
Tql::Eval(eval) => {
let promql = PromQuery {
start: eval.start,
end: eval.end,
step: eval.step,
query: eval.query,
};
let stmt = QueryLanguageParser::parse_promql(&promql).unwrap();
planner.plan(stmt, QueryContext::arc()).await.unwrap()
}
Tql::Explain(_) => unimplemented!(),
};
engine.execute(plan, QueryContext::arc()).await.unwrap()
}
_ => self
.inner()
.execute_stmt(stmt, QueryContext::arc())
.await
.unwrap(),
}
}
pub(crate) fn inner(&self) -> &Instance {
&self.instance
}
@@ -207,29 +167,3 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
);
SqlHandler::new(mock_engine.clone(), catalog_manager, mock_engine, None)
}
pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance {
MockInstance::new(test_name).await
}
pub async fn check_unordered_output_stream(output: Output, expected: String) {
let sort_table = |table: String| -> String {
let replaced = table.replace("\\n", "\n");
let mut lines = replaced.split('\n').collect::<Vec<_>>();
lines.sort();
lines
.into_iter()
.map(|s| s.to_string())
.reduce(|acc, e| format!("{acc}\\n{e}"))
.unwrap()
};
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = sort_table(recordbatches.pretty_print().unwrap());
let expected = sort_table(expected);
assert_eq!(pretty_print, expected);
}

View File

@@ -59,6 +59,8 @@ common-test-util = { path = "../common/test-util" }
datanode = { path = "../datanode" }
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"] }
rstest = "0.17"
rstest_reuse = "0.5"
strfmt = "0.2"
toml = "0.5"
tower = "0.4"

View File

@@ -52,14 +52,15 @@ use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::StatementHandlerRef;
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error as server_error;
use servers::error::{ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::prom::{PromHandler, PromHandlerRef};
use servers::prom::PromHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
};
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
@@ -108,7 +109,6 @@ pub struct Instance {
statement_handler: StatementHandlerRef,
query_engine: QueryEngineRef,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
promql_handler: Option<PromHandlerRef>,
create_expr_factory: CreateExprFactoryRef,
@@ -160,7 +160,6 @@ impl Instance {
statement_handler: dist_instance.clone(),
query_engine,
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
})
@@ -208,7 +207,6 @@ impl Instance {
statement_handler: dn_instance.clone(),
query_engine,
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
promql_handler: Some(dn_instance.clone()),
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
})
@@ -243,7 +241,6 @@ impl Instance {
query_engine,
create_expr_factory: Arc::new(DefaultCreateExprFactory),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
}
@@ -438,10 +435,14 @@ fn parse_stmt(sql: &str) -> Result<Vec<Statement>> {
}
impl Instance {
async fn plan_exec(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
pub(crate) async fn plan_exec(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<Output> {
let planner = self.query_engine.planner();
let plan = planner
.plan(QueryStatement::Sql(stmt), query_ctx.clone())
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
self.query_engine
@@ -500,13 +501,13 @@ impl Instance {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
self.plan_exec(stmt, query_ctx).await
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}
// For performance consideration, only "insert with select" is executed by query engine.
// Plain insert ("insert with values") is still executed directly in statement.
Statement::Insert(ref insert) if insert.is_insert_select() => {
self.plan_exec(stmt, query_ctx).await
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}
Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
@@ -582,20 +583,13 @@ impl SqlQueryHandler for Instance {
}
async fn do_promql_query(&self, query: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
if let Some(handler) = &self.promql_handler {
let result = handler.do_query(query).await.with_context(|_| {
let query_literal = format!("{query:?}");
ExecutePromqlSnafu {
query: query_literal,
}
});
vec![result]
} else {
vec![Err(NotSupportedSnafu {
feat: "PromQL Query",
}
.build())]
}
let result =
PromHandler::do_query(self, query)
.await
.with_context(|_| ExecutePromqlSnafu {
query: format!("{query:?}"),
});
vec![result]
}
async fn do_describe(
@@ -631,14 +625,15 @@ impl SqlQueryHandler for Instance {
#[async_trait]
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
if let Some(promql_handler) = &self.promql_handler {
promql_handler.do_query(query).await
} else {
server_error::NotSupportedSnafu {
feat: "PromQL query in Frontend",
}
.fail()
}
let stmt = QueryLanguageParser::parse_promql(query).with_context(|_| ParsePromQLSnafu {
query: query.clone(),
})?;
self.plan_exec(stmt, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| ExecuteQuerySnafu {
query: format!("{query:?}"),
})
}
}

View File

@@ -34,3 +34,8 @@ mod server;
mod table;
#[cfg(test)]
mod tests;
#[cfg(test)]
// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate
#[allow(clippy::single_component_path_imports)]
use rstest_reuse;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod instance_test;
mod promql_test;
mod test_util;
use std::collections::HashMap;

View File

@@ -20,16 +20,21 @@ use common_query::Output;
use common_recordbatch::util;
use common_telemetry::logging;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use crate::error::Error;
use crate::tests::test_util::check_output_stream;
use crate::tests::{create_standalone_instance, MockStandaloneInstance};
use crate::instance::Instance;
use crate::tests::test_util::{
both_instances_cases, check_output_stream, check_unordered_output_stream, distributed,
standalone, standalone_instance_case, MockInstance,
};
#[tokio::test(flavor = "multi_thread")]
async fn test_create_database_and_insert_query() {
let instance = create_standalone_instance("create_database_and_insert_query").await;
#[apply(both_instances_cases)]
async fn test_create_database_and_insert_query(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(&instance, "create database test").await;
assert!(matches!(output, Output::AffectedRows(1)));
@@ -74,10 +79,9 @@ async fn test_create_database_and_insert_query() {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_issue477_same_table_name_in_different_databases() {
let instance =
create_standalone_instance("test_issue477_same_table_name_in_different_databases").await;
#[apply(both_instances_cases)]
async fn test_issue477_same_table_name_in_different_databases(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
// Create database a and b
let output = execute_sql(&instance, "create database a").await;
@@ -144,7 +148,7 @@ async fn test_issue477_same_table_name_in_different_databases() {
.await;
}
async fn assert_query_result(instance: &MockStandaloneInstance, sql: &str, ts: i64, host: &str) {
async fn assert_query_result(instance: &Arc<Instance>, sql: &str, ts: i64, host: &str) {
let query_output = execute_sql(instance, sql).await;
match query_output {
Output::Stream(s) => {
@@ -164,9 +168,9 @@ async fn assert_query_result(instance: &MockStandaloneInstance, sql: &str, ts: i
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = create_standalone_instance("test_execute_insert").await;
#[apply(both_instances_cases)]
async fn test_execute_insert(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
// create table
execute_sql(
@@ -186,9 +190,9 @@ async fn test_execute_insert() {
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_by_select() {
let instance = create_standalone_instance("test_execute_insert_by_select").await;
#[apply(both_instances_cases)]
async fn test_execute_insert_by_select(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
// create table
execute_sql(
@@ -247,9 +251,9 @@ async fn test_execute_insert_by_select() {
check_output_stream(output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_query_with_i64_timestamp() {
let instance = create_standalone_instance("insert_query_i64_timestamp").await;
#[apply(both_instances_cases)]
async fn test_execute_insert_query_with_i64_timestamp(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
execute_sql(
&instance,
@@ -299,9 +303,9 @@ async fn test_execute_insert_query_with_i64_timestamp() {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_query() {
let instance = create_standalone_instance("execute_query").await;
#[apply(standalone_instance_case)]
async fn test_execute_query(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(&instance, "select sum(number) from numbers limit 20").await;
match output {
@@ -319,9 +323,11 @@ async fn test_execute_query() {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_show_databases_tables() {
let instance = create_standalone_instance("execute_show_databases_tables").await;
#[apply(both_instances_cases)]
async fn test_execute_show_databases_tables(instance: Arc<dyn MockInstance>) {
let is_distributed_mode = instance.is_distributed_mode();
let instance = instance.frontend();
let output = execute_sql(&instance, "show databases").await;
match output {
@@ -353,15 +359,26 @@ async fn test_execute_show_databases_tables() {
_ => unreachable!(),
}
let expected = if is_distributed_mode {
"\
+---------+
| Tables |
+---------+
| scripts |
+---------+\
"
} else {
"\
+---------+
| Tables |
+---------+
| numbers |
| scripts |
+---------+\
"
};
let output = execute_sql(&instance, "show tables").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
assert_eq!(1, databases[0].num_columns());
assert_eq!(databases[0].column(0).len(), 2);
}
_ => unreachable!(),
}
check_unordered_output_stream(output, expected).await;
execute_sql(
&instance,
@@ -369,14 +386,27 @@ async fn test_execute_show_databases_tables() {
).await;
let output = execute_sql(&instance, "show tables").await;
match output {
Output::RecordBatches(databases) => {
let databases = databases.take();
assert_eq!(1, databases[0].num_columns());
assert_eq!(databases[0].column(0).len(), 3);
}
_ => unreachable!(),
}
let expected = if is_distributed_mode {
"\
+---------+
| Tables |
+---------+
| demo |
| scripts |
+---------+\
"
} else {
"\
+---------+
| Tables |
+---------+
| demo |
| numbers |
| scripts |
+---------+\
"
};
check_unordered_output_stream(output, expected).await;
// show tables like [string]
let output = execute_sql(&instance, "show tables like 'de%'").await;
@@ -395,9 +425,9 @@ async fn test_execute_show_databases_tables() {
}
}
#[tokio::test(flavor = "multi_thread")]
pub async fn test_execute_create() {
let instance = create_standalone_instance("execute_create").await;
#[apply(both_instances_cases)]
async fn test_execute_create(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(
&instance,
@@ -414,9 +444,9 @@ pub async fn test_execute_create() {
assert!(matches!(output, Output::AffectedRows(0)));
}
#[tokio::test]
async fn test_rename_table() {
let instance = create_standalone_instance("test_rename_table_local").await;
#[apply(standalone_instance_case)]
async fn test_rename_table(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(&instance, "create database db").await;
assert!(matches!(output, Output::AffectedRows(1)));
@@ -470,9 +500,10 @@ async fn test_rename_table() {
.expect_err("no table found in expect");
}
#[tokio::test]
async fn test_create_table_after_rename_table() {
let instance = create_standalone_instance("test_rename_table_local").await;
// should apply to both instances. tracked in #723
#[apply(standalone_instance_case)]
async fn test_create_table_after_rename_table(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(&instance, "create database db").await;
assert!(matches!(output, Output::AffectedRows(1)));
@@ -520,9 +551,9 @@ async fn test_create_table_after_rename_table() {
check_output_stream(output, expect).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table() {
let instance = create_standalone_instance("test_alter_table").await;
#[apply(both_instances_cases)]
async fn test_alter_table(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
// create table
execute_sql(
@@ -608,11 +639,10 @@ async fn test_alter_table() {
check_output_stream(output, expected).await;
}
async fn test_insert_with_default_value_for_type(type_name: &str) {
let instance = create_standalone_instance("execute_create").await;
async fn test_insert_with_default_value_for_type(instance: Arc<Instance>, type_name: &str) {
let table_name = format!("test_table_with_{type_name}");
let create_sql = format!(
r#"create table test_table(
r#"create table {table_name}(
host string,
ts {type_name} DEFAULT CURRENT_TIMESTAMP,
cpu double default 0,
@@ -626,7 +656,7 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
// Insert with ts.
let output = execute_sql(
&instance,
"insert into test_table(host, cpu, ts) values ('host1', 1.1, 1000)",
&format!("insert into {table_name}(host, cpu, ts) values ('host1', 1.1, 1000)"),
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
@@ -634,12 +664,12 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
// Insert without ts, so it should be filled by default value.
let output = execute_sql(
&instance,
"insert into test_table(host, cpu) values ('host2', 2.2)",
&format!("insert into {table_name}(host, cpu) values ('host2', 2.2)"),
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql(&instance, "select host, cpu from test_table").await;
let output = execute_sql(&instance, &format!("select host, cpu from {table_name}")).await;
let expected = "\
+-------+-----+
| host | cpu |
@@ -652,15 +682,17 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
check_output_stream(output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_with_default_value() {
test_insert_with_default_value_for_type("timestamp").await;
test_insert_with_default_value_for_type("bigint").await;
// should apply to both instances. tracked in #1293
#[apply(standalone_instance_case)]
async fn test_insert_with_default_value(instance: Arc<dyn MockInstance>) {
test_insert_with_default_value_for_type(instance.frontend(), "timestamp").await;
test_insert_with_default_value_for_type(instance.frontend(), "bigint").await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_use_database() {
let instance = create_standalone_instance("test_use_database").await;
// should apply to both instance. tracked in #1294
#[apply(standalone_instance_case)]
async fn test_use_database(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(&instance, "create database db1").await;
assert!(matches!(output, Output::AffectedRows(1)));
@@ -717,9 +749,10 @@ async fn test_use_database() {
check_output_stream(output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_delete() {
let instance = create_standalone_instance("test_delete").await;
// should apply to both instances. tracked in #755
#[apply(standalone_instance_case)]
async fn test_delete(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let output = execute_sql(
&instance,
@@ -766,12 +799,11 @@ async fn test_delete() {
check_output_stream(output, expect).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_copy_to_s3() {
logging::init_default_ut_logging();
#[apply(standalone_instance_case)]
async fn test_execute_copy_to_s3(instance: Arc<dyn MockInstance>) {
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = create_standalone_instance("test_execute_copy_to_s3").await;
let instance = instance.frontend();
// setups
execute_sql(
@@ -805,12 +837,12 @@ async fn test_execute_copy_to_s3() {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_copy_from_s3() {
#[apply(standalone_instance_case)]
async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = create_standalone_instance("test_execute_copy_from_s3").await;
let instance = instance.frontend();
// setups
execute_sql(
@@ -905,26 +937,26 @@ async fn test_execute_copy_from_s3() {
}
}
async fn execute_sql(instance: &MockStandaloneInstance, sql: &str) -> Output {
async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
async fn try_execute_sql(
instance: &MockStandaloneInstance,
instance: &Arc<Instance>,
sql: &str,
) -> Result<Output, crate::error::Error> {
try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
async fn try_execute_sql_in_db(
instance: &MockStandaloneInstance,
instance: &Arc<Instance>,
sql: &str,
db: &str,
) -> Result<Output, crate::error::Error> {
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
instance.instance.do_query(sql, query_ctx).await.remove(0)
instance.do_query(sql, query_ctx).await.remove(0)
}
async fn execute_sql_in_db(instance: &MockStandaloneInstance, sql: &str, db: &str) -> Output {
async fn execute_sql_in_db(instance: &Arc<Instance>, sql: &str, db: &str) -> Output {
try_execute_sql_in_db(instance, sql, db).await.unwrap()
}

View File

@@ -12,15 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use super::test_util::check_unordered_output_stream;
use crate::tests::test_util::setup_test_instance;
use super::test_util::{check_unordered_output_stream, standalone, standalone_instance_case};
use crate::instance::Instance;
use crate::tests::test_util::MockInstance;
#[allow(clippy::too_many_arguments)]
async fn create_insert_query_assert(
instance: Arc<Instance>,
create: &str,
insert: &str,
promql: &str,
@@ -30,37 +37,54 @@ async fn create_insert_query_assert(
lookback: Duration,
expected: &str,
) {
let instance = setup_test_instance("test_execute_insert").await;
instance.do_query(create, QueryContext::arc()).await;
instance.do_query(insert, QueryContext::arc()).await;
instance.execute_sql(create).await;
instance.execute_sql(insert).await;
let query = PromQuery {
query: promql.to_string(),
start: "0".to_string(),
end: "0".to_string(),
step: "5m".to_string(),
};
let QueryStatement::Promql(mut eval_stmt) = QueryLanguageParser::parse_promql(&query).unwrap() else { unreachable!() };
eval_stmt.start = start;
eval_stmt.end = end;
eval_stmt.interval = interval;
eval_stmt.lookback_delta = lookback;
let query_output = instance
.inner()
.execute_promql_statement(promql, start, end, interval, lookback, QueryContext::arc())
.plan_exec(QueryStatement::Promql(eval_stmt), QueryContext::arc())
.await
.unwrap();
let expected = String::from(expected);
check_unordered_output_stream(query_output, expected).await;
}
#[allow(clippy::too_many_arguments)]
async fn create_insert_tql_assert(create: &str, insert: &str, tql: &str, expected: &str) {
let instance = setup_test_instance("test_execute_insert").await;
async fn create_insert_tql_assert(
instance: Arc<Instance>,
create: &str,
insert: &str,
tql: &str,
expected: &str,
) {
instance.do_query(create, QueryContext::arc()).await;
instance.do_query(insert, QueryContext::arc()).await;
instance.execute_sql(create).await;
instance.execute_sql(insert).await;
let query_output = instance.execute_sql(tql).await;
let expected = String::from(expected);
let query_output = instance
.do_query(tql, QueryContext::arc())
.await
.remove(0)
.unwrap();
check_unordered_output_stream(query_output, expected).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_tql_query_ceil() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn sql_insert_tql_query_ceil(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_tql_assert(
instance,
r#"create table http_requests_total (
host string,
cpu double,
@@ -102,9 +126,13 @@ async fn sql_insert_tql_query_ceil() {
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_promql_query_ceil() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn sql_insert_promql_query_ceil(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
r#"create table http_requests_total (
host string,
cpu double,
@@ -181,9 +209,13 @@ fn unix_epoch_plus_100s() -> SystemTime {
// eval instant at 50m SUM BY (group) (http_requests{job="api-server"})
// {group="canary"} 700
// {group="production"} 300
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_simple_sum() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_simple_sum(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"SUM BY (group) (http_requests{job=\"api-server\"})",
@@ -205,9 +237,13 @@ async fn aggregators_simple_sum() {
// eval instant at 50m avg by (group) (http_requests{job="api-server"})
// {group="canary"} 350
// {group="production"} 150
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_simple_avg() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_simple_avg(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"AVG BY (group) (http_requests{job=\"api-server\"})",
@@ -229,9 +265,13 @@ async fn aggregators_simple_avg() {
// eval instant at 50m count by (group) (http_requests{job="api-server"})
// {group="canary"} 2
// {group="production"} 2
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_simple_count() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_simple_count(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"COUNT BY (group) (http_requests{job=\"api-server\"})",
@@ -253,9 +293,13 @@ async fn aggregators_simple_count() {
// eval instant at 50m sum without (instance) (http_requests{job="api-server"})
// {group="canary",job="api-server"} 700
// {group="production",job="api-server"} 300
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_simple_without() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_simple_without(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"sum without (instance) (http_requests{job=\"api-server\"})",
@@ -276,9 +320,13 @@ async fn aggregators_simple_without() {
// # Empty by.
// eval instant at 50m sum by () (http_requests{job="api-server"})
// {} 1000
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_empty_by() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_empty_by(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"sum by () (http_requests{job=\"api-server\"})",
@@ -298,9 +346,13 @@ async fn aggregators_empty_by() {
// # No by/without.
// eval instant at 50m sum(http_requests{job="api-server"})
// {} 1000
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_no_by_without() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_no_by_without(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
r#"sum (http_requests{job="api-server"})"#,
@@ -321,9 +373,13 @@ async fn aggregators_no_by_without() {
// eval instant at 50m sum without () (http_requests{job="api-server",group="production"})
// {group="production",job="api-server",instance="0"} 100
// {group="production",job="api-server",instance="1"} 200
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_empty_without() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_empty_without(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
r#"sum without () (http_requests{job="api-server",group="production"})"#,
@@ -345,9 +401,13 @@ async fn aggregators_empty_without() {
// eval instant at 50m sum(http_requests) by (job) + min(http_requests) by (job) + max(http_requests) by (job) + avg(http_requests) by (job)
// {job="app-server"} 4550
// {job="api-server"} 1750
#[tokio::test(flavor = "multi_thread")]
async fn aggregators_complex_combined_aggrs() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn aggregators_complex_combined_aggrs(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"sum(http_requests) by (job) + min(http_requests) by (job) + max(http_requests) by (job) + avg(http_requests) by (job)",
@@ -366,9 +426,13 @@ async fn aggregators_complex_combined_aggrs() {
}
// This is not from prometheus test set. It's derived from `aggregators_complex_combined_aggrs()`
#[tokio::test(flavor = "multi_thread")]
async fn two_aggregators_combined_aggrs() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn two_aggregators_combined_aggrs(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
"sum(http_requests) by (job) + min(http_requests) by (job) ",
@@ -389,10 +453,14 @@ async fn two_aggregators_combined_aggrs() {
// eval instant at 50m stddev by (instance)(http_requests)
// {instance="0"} 223.60679774998
// {instance="1"} 223.60679774998
#[tokio::test(flavor = "multi_thread")]
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
#[ignore = "TODO(ruihang): fix this case"]
async fn stddev_by_label() {
async fn stddev_by_label(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
r#"stddev by (instance)(http_requests)"#,
@@ -411,9 +479,13 @@ async fn stddev_by_label() {
}
// This is not derived from prometheus
#[tokio::test(flavor = "multi_thread")]
async fn binary_op_plain_columns() {
// should apply to both instances. tracked in #1296
#[apply(standalone_instance_case)]
async fn binary_op_plain_columns(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
create_insert_query_assert(
instance,
AGGREGATORS_CREATE_TABLE,
AGGREGATORS_INSERT_DATA,
r#"http_requests - http_requests"#,

View File

@@ -12,8 +12,80 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::util;
use rstest_reuse::{self, template};
use crate::instance::Instance;
use crate::tests::{
create_distributed_instance, create_standalone_instance, MockDistributedInstance,
MockStandaloneInstance,
};
pub(crate) trait MockInstance {
fn frontend(&self) -> Arc<Instance>;
fn is_distributed_mode(&self) -> bool;
}
impl MockInstance for MockStandaloneInstance {
fn frontend(&self) -> Arc<Instance> {
self.instance.clone()
}
fn is_distributed_mode(&self) -> bool {
false
}
}
impl MockInstance for MockDistributedInstance {
fn frontend(&self) -> Arc<Instance> {
self.frontend.clone()
}
fn is_distributed_mode(&self) -> bool {
true
}
}
pub(crate) async fn standalone() -> Arc<dyn MockInstance> {
let test_name = uuid::Uuid::new_v4().to_string();
let instance = create_standalone_instance(&test_name).await;
Arc::new(instance)
}
pub(crate) async fn distributed() -> Arc<dyn MockInstance> {
let test_name = uuid::Uuid::new_v4().to_string();
let instance = create_distributed_instance(&test_name).await;
Arc::new(instance)
}
#[template]
#[rstest]
#[case::test_with_standalone(standalone())]
#[case::test_with_distributed(distributed())]
#[awt]
#[tokio::test(flavor = "multi_thread")]
pub(crate) fn both_instances_cases(
#[future]
#[case]
instance: Arc<dyn MockInstance>,
) {
}
#[template]
#[rstest]
#[case::test_with_standalone(standalone())]
#[awt]
#[tokio::test(flavor = "multi_thread")]
pub(crate) fn standalone_instance_case(
#[future]
#[case]
instance: Arc<dyn MockInstance>,
) {
}
pub(crate) async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
@@ -24,3 +96,25 @@ pub(crate) async fn check_output_stream(output: Output, expected: String) {
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected, "{}", pretty_print);
}
pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str) {
let sort_table = |table: &str| -> String {
let replaced = table.replace("\\n", "\n");
let mut lines = replaced.split('\n').collect::<Vec<_>>();
lines.sort();
lines
.into_iter()
.map(|s| s.to_string())
.reduce(|acc, e| format!("{acc}\\n{e}"))
.unwrap()
};
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = sort_table(&recordbatches.pretty_print().unwrap());
let expected = sort_table(expected);
assert_eq!(pretty_print, expected);
}

View File

@@ -22,6 +22,7 @@ use axum::{http, Json};
use base64::DecodeError;
use catalog;
use common_error::prelude::*;
use query::parser::PromQuery;
use serde_json::json;
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
@@ -281,6 +282,13 @@ pub enum Error {
source: http::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse PromQL: {query:?}, source: {source}"))]
ParsePromQL {
query: PromQuery,
#[snafu(backtrace)]
source: query::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -346,6 +354,8 @@ impl ErrorExt for Error {
#[cfg(feature = "mem-prof")]
DumpProfileData { source, .. } => source.status_code(),
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
ParsePromQL { source, .. } => source.status_code(),
}
}