feat: PromQL handler in query engine (#861)

* example promql test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* make the mock test works

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update planner test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippys

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add license header

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-01-11 11:31:07 +08:00
committed by GitHub
parent 9428e70971
commit a9b42b436d
17 changed files with 324 additions and 46 deletions

2
Cargo.lock generated
View File

@@ -5140,6 +5140,7 @@ dependencies = [
name = "promql"
version = "0.1.0"
dependencies = [
"async-trait",
"bytemuck",
"catalog",
"common-catalog",
@@ -5389,6 +5390,7 @@ dependencies = [
"num-traits",
"once_cell",
"paste",
"promql",
"promql-parser",
"rand 0.8.5",
"serde",

View File

@@ -159,6 +159,11 @@ impl Instance {
let stmt = QueryLanguageParser::parse_sql(sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
pub async fn execute_promql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
let stmt = QueryLanguageParser::parse_promql(sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,

View File

@@ -13,4 +13,5 @@
// limitations under the License.
mod instance_test;
mod promql_test;
pub(crate) mod test_util;

View File

@@ -21,7 +21,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use session::context::QueryContext;
use crate::tests::test_util::{self, MockInstance};
use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance};
#[tokio::test(flavor = "multi_thread")]
async fn test_create_database_and_insert_query() {
@@ -69,6 +69,7 @@ async fn test_create_database_and_insert_query() {
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_issue477_same_table_name_in_different_databases() {
let instance = MockInstance::new("test_issue477_same_table_name_in_different_databases").await;
@@ -158,19 +159,6 @@ async fn assert_query_result(instance: &MockInstance, sql: &str, ts: i64, host:
}
}
async fn setup_test_instance(test_name: &str) -> MockInstance {
let instance = MockInstance::new(test_name).await;
test_util::create_test_table(
instance.inner(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();
instance
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
let instance = setup_test_instance("test_execute_insert").await;
@@ -354,16 +342,6 @@ pub async fn test_execute_create() {
assert!(matches!(output, Output::AffectedRows(0)));
}
async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_alter_table() {
let instance = setup_test_instance("test_alter_table").await;

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use session::context::QueryContext;
use crate::tests::test_util::{check_output_stream, setup_test_instance};
#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_promql_query_ceil() {
let instance = setup_test_instance("test_execute_insert").await;
let query_ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
DEFAULT_SCHEMA_NAME.to_owned(),
));
let put_output = instance
.inner()
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 0),
('host1', 66.6, 2048, 2000),
('host1', 66.6, 4096, 5000),
('host1', 43.1, 8192, 7000),
('host1', 19.1, 10240, 9000),
('host1', 99.1, 20480, 10000),
('host1', 999.9, 40960, 21000),
('host1', 31.9, 8192, 22000),
('host1', 95.4, 333.3, 32000),
('host1', 12423.1, 1333.3, 49000),
('host1', 0, 2333.3, 80000),
('host1', 49, 3333.3, 99000);
"#,
query_ctx.clone(),
)
.await
.unwrap();
assert!(matches!(put_output, Output::AffectedRows(12)));
let promql = "ceil(demo{host=\"host1\"})";
let query_output = instance
.inner()
.execute_promql(promql, query_ctx)
.await
.unwrap();
let expected = String::from(
"\
+---------------------+----------------+-------------------+
| ts | ceil(demo.cpu) | ceil(demo.memory) |
+---------------------+----------------+-------------------+
| 1970-01-01T00:00:00 | 67 | 1024 |
| 1970-01-01T00:00:05 | 67 | 4096 |
| 1970-01-01T00:00:10 | 100 | 20480 |
| 1970-01-01T00:00:50 | 12424 | 1334 |
| 1970-01-01T00:01:20 | 0 | 2334 |
| 1970-01-01T00:01:40 | 49 | 3334 |
+---------------------+----------------+-------------------+",
);
check_output_stream(query_output, expected).await;
}

View File

@@ -16,6 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_query::Output;
use common_recordbatch::util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use mito::config::EngineConfig;
@@ -85,7 +87,7 @@ pub(crate) async fn create_test_table(
ts_type: ConcreteDataType,
) -> Result<()> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
@@ -146,3 +148,26 @@ pub async fn create_mock_sql_handler() -> SqlHandler {
SqlHandler::new(mock_engine, catalog_manager, factory.query_engine())
}
pub(crate) async fn setup_test_instance(test_name: &str) -> MockInstance {
let instance = MockInstance::new(test_name).await;
create_test_table(
instance.inner(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();
instance
}
pub async fn check_output_stream(output: Output, expected: String) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected);
}

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
bytemuck = "1.12"
catalog = { path = "../catalog" }
common-error = { path = "../common/error" }
@@ -13,10 +14,10 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" }
query = { path = "../query" }
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
query = { path = "../query" }

View File

@@ -26,6 +26,7 @@ pub enum Error {
#[snafu(display("Internal error during build DataFusion plan, error: {}", source))]
DataFusionPlanning {
source: datafusion::error::DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Unexpected plan or expression: {}", desc))]
@@ -37,6 +38,9 @@ pub enum Error {
#[snafu(display("Cannot find time index column in table {}", table))]
TimeIndexNotFound { table: String, backtrace: Backtrace },
#[snafu(display("Cannot find value columns in table {}", table))]
ValueNotFound { table: String, backtrace: Backtrace },
#[snafu(display("Cannot find the table {}", table))]
TableNotFound {
table: String,
@@ -84,6 +88,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
TimeIndexNotFound { .. }
| ValueNotFound { .. }
| UnsupportedExpr { .. }
| MultipleVector { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,

View File

@@ -14,11 +14,13 @@
mod instant_manipulate;
mod normalize;
mod planner;
mod range_manipulate;
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
pub use normalize::{SeriesNormalize, SeriesNormalizeExec, SeriesNormalizeStream};
pub use planner::PromExtensionPlanner;
pub use range_manipulate::{RangeManipulate, RangeManipulateExec, RangeManipulateStream};
pub(crate) type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;

View File

@@ -0,0 +1,49 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use super::{InstantManipulate, RangeManipulate};
use crate::extension_plan::SeriesNormalize;
pub struct PromExtensionPlanner {}
#[async_trait]
impl ExtensionPlanner for PromExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> DfResult<Option<Arc<dyn ExecutionPlan>>> {
if let Some(node) = node.as_any().downcast_ref::<SeriesNormalize>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<InstantManipulate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<RangeManipulate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else {
Ok(None)
}
}
}

View File

@@ -18,8 +18,9 @@ use std::time::{Duration, UNIX_EPOCH};
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::{
BinaryExpr, BuiltinScalarFunction, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
BinaryExpr, BuiltinScalarFunction, Extension, Filter, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr};
use datafusion::scalar::ScalarValue;
use datafusion::sql::planner::ContextProvider;
@@ -32,6 +33,7 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu,
TableNotFoundSnafu, TimeIndexNotFoundSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
ValueNotFoundSnafu,
};
use crate::extension_plan::{InstantManipulate, Millisecond, SeriesNormalize};
@@ -149,6 +151,8 @@ impl<S: ContextProvider> PromPlanner<S> {
LogicalPlanBuilder::from(input)
.project(func_exprs)
.context(DataFusionPlanningSnafu)?
.filter(self.create_empty_values_filter_expr()?)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?
}
@@ -178,18 +182,38 @@ impl<S: ContextProvider> PromPlanner<S> {
label_matchers: Matchers,
) -> Result<LogicalPlan> {
let table_name = self.ctx.table_name.clone().unwrap();
// TODO(ruihang): add time range filter
let filter = self.matchers_to_expr(label_matchers)?;
let table_scan = self.create_table_scan_plan(&table_name, filter)?;
let offset = offset.unwrap_or_default();
// make filter exprs
let mut filters = self.matchers_to_expr(label_matchers)?;
filters.push(self.create_time_index_column_expr()?.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(Some(self.ctx.start), None),
)));
filters.push(self.create_time_index_column_expr()?.lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(Some(self.ctx.end), None),
)));
// make table scan with filter exprs
let table_scan = self.create_table_scan_plan(&table_name, filters.clone())?;
// make filter plan
let filter_plan = LogicalPlan::Filter(
Filter::try_new(
// safety: at least there are two exprs that filter timestamp column.
utils::conjunction(filters.into_iter()).unwrap(),
Arc::new(table_scan),
)
.context(DataFusionPlanningSnafu)?,
);
// make series_normalize plan
let offset = offset.unwrap_or_default();
let series_normalize = SeriesNormalize::new(
offset,
self.ctx
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu { table: table_name })?,
table_scan,
filter_plan,
);
let logical_plan = LogicalPlan::Extension(Extension {
node: Arc::new(series_normalize),
@@ -272,6 +296,7 @@ impl<S: ContextProvider> PromPlanner<S> {
.value_column_names()
.cloned()
.collect();
self.ctx.value_columns = values;
Ok(())
@@ -350,6 +375,18 @@ impl<S: ContextProvider> PromPlanner<S> {
.with_context(|| TimeIndexNotFoundSnafu { table: "unknown" })?,
)))
}
fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
let mut exprs = Vec::with_capacity(self.ctx.value_columns.len());
for value in &self.ctx.value_columns {
let expr = DfExpr::Column(Column::from_name(value)).is_not_null();
exprs.push(expr);
}
utils::conjunction(exprs.into_iter()).context(ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap(),
})
}
}
#[derive(Default, Debug)]
@@ -493,10 +530,12 @@ mod test {
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();
let expected = String::from(
"Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\")] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
"Filter: some_metric.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
\n Projection: some_metric.timestamp, TEMPLATE(some_metric.field_0) [timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 != Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 != Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]",
).replace("TEMPLATE", plan_name);
assert_eq!(plan.display_indent_schema().to_string(), expected);

View File

@@ -26,6 +26,7 @@ futures = "0.3"
futures-util.workspace = true
metrics = "0.20"
once_cell = "1.10"
promql = { path = "../promql" }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" }
serde.workspace = true
serde_json = "1.0"

View File

@@ -34,6 +34,8 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::timer;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use promql::planner::PromPlanner;
use promql_parser::parser::EvalStmt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::statement::Statement;
@@ -41,7 +43,7 @@ use sql::statements::statement::Statement;
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::datafusion::planner::DfPlanner;
use crate::error::{QueryExecutionSnafu, Result};
use crate::error::{QueryExecutionSnafu, QueryPlanSnafu, Result};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::parser::QueryStatement;
@@ -66,8 +68,19 @@ impl DatafusionQueryEngine {
fn plan_sql_stmt(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let context_provider = DfContextProviderAdapter::new(self.state.clone(), query_ctx);
let planner = DfPlanner::new(&context_provider);
planner
.statement_to_plan(stmt)
.map_err(BoxedError::new)
.context(QueryPlanSnafu)
}
planner.statement_to_plan(stmt)
// TODO(ruihang): test this method once parser is ready.
fn plan_promql_stmt(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
let context_provider = DfContextProviderAdapter::new(self.state.clone(), query_ctx);
PromPlanner::stmt_to_plan(stmt, context_provider)
.map(LogicalPlan::DfPlan)
.map_err(BoxedError::new)
.context(QueryPlanSnafu)
}
}
@@ -85,7 +98,7 @@ impl QueryEngine for DatafusionQueryEngine {
) -> Result<LogicalPlan> {
match stmt {
QueryStatement::Sql(stmt) => self.plan_sql_stmt(stmt, query_ctx),
QueryStatement::Promql(_) => unimplemented!(),
QueryStatement::Promql(stmt) => self.plan_promql_stmt(stmt, query_ctx),
}
}

View File

@@ -15,6 +15,7 @@
//! query engine metrics
pub static METRIC_PARSE_SQL_ELAPSED: &str = "query.parse_sql_elapsed";
pub static METRIC_PARSE_PROMQL_ELAPSED: &str = "query.parse_promql_elapsed";
pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_elapsed";
pub static METRIC_OPTIMIZE_PHYSICAL_ELAPSED: &str = "query.optimize_physicalplan_elapsed";
pub static METRIC_CREATE_PHYSICAL_ELAPSED: &str = "query.create_physicalplan_elapsed";

View File

@@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_error::prelude::BoxedError;
use common_telemetry::timer;
use promql_parser::parser::EvalStmt;
use promql_parser::label::{MatchOp, Matcher, Matchers};
use promql_parser::parser::{EvalStmt, Expr as PromExpr, Function, ValueType};
use snafu::ResultExt;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use crate::error::{MultipleStatementsSnafu, QueryParseSnafu, Result};
use crate::metric::METRIC_PARSE_SQL_ELAPSED;
use crate::metric::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
#[derive(Debug, Clone)]
pub enum QueryStatement {
@@ -48,6 +51,50 @@ impl QueryLanguageParser {
Ok(QueryStatement::Sql(statement.pop().unwrap()))
}
}
// TODO(ruihang): implement this method when parser is ready.
pub fn parse_promql(_promql: &str) -> Result<QueryStatement> {
let _timer = timer!(METRIC_PARSE_PROMQL_ELAPSED);
let prom_expr = PromExpr::Call {
func: Function {
name: "ceil",
arg_types: vec![ValueType::Vector],
variadic: false,
return_type: ValueType::Vector,
},
args: vec![Box::new(PromExpr::VectorSelector {
name: Some("demo".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "host".to_string(),
value: "host1".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: promql_parser::label::METRIC_NAME.to_string(),
value: "demo".to_string(),
},
],
},
})],
};
let eval_stmt = EvalStmt {
expr: prom_expr,
start: std::time::UNIX_EPOCH,
end: std::time::UNIX_EPOCH
.checked_add(Duration::from_secs(100))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
Ok(QueryStatement::Promql(eval_stmt))
}
}
#[cfg(test)]

View File

@@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use catalog::CatalogListRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
@@ -23,15 +24,17 @@ use common_query::physical_plan::{SessionContext, TaskContext};
use common_query::prelude::ScalarUdf;
use datafusion::catalog::TableReference;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{SessionConfig, SessionState};
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion_common::ScalarValue;
use datafusion_expr::{LogicalPlan as DfLogicalPlan, TableSource};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_sql::planner::ContextProvider;
use datatypes::arrow::datatypes::DataType;
use promql::extension_plan::PromExtensionPlanner;
use crate::datafusion::DfCatalogListAdapter;
use crate::optimizer::TypeConversionRule;
@@ -66,6 +69,7 @@ impl QueryEngineState {
let mut session_state = SessionState::with_config_rt(session_config, runtime_env);
session_state.optimizer = optimizer;
session_state.catalog_list = Arc::new(DfCatalogListAdapter::new(catalog_list.clone()));
session_state.query_planner = Arc::new(DfQueryPlanner::new());
let df_context = SessionContext::with_state(session_state);
@@ -158,3 +162,30 @@ impl QueryEngineState {
Ok(plan)
}
}
struct DfQueryPlanner {
physical_planner: DefaultPhysicalPlanner,
}
#[async_trait]
impl QueryPlanner for DfQueryPlanner {
async fn create_physical_plan(
&self,
logical_plan: &DfLogicalPlan,
session_state: &SessionState,
) -> DfResult<Arc<dyn ExecutionPlan>> {
self.physical_planner
.create_physical_plan(logical_plan, session_state)
.await
}
}
impl DfQueryPlanner {
fn new() -> Self {
Self {
physical_planner: DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
PromExtensionPlanner {},
)]),
}
}
}

View File

@@ -116,9 +116,14 @@ impl TableMeta {
pub fn value_column_names(&self) -> impl Iterator<Item = &String> {
let columns_schemas = &self.schema.column_schemas();
self.value_indices
.iter()
.map(|idx| &columns_schemas[*idx].name)
self.value_indices.iter().filter_map(|idx| {
let column = &columns_schemas[*idx];
if column.is_time_index() {
None
} else {
Some(&column.name)
}
})
}
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.