mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
feat(flow): support prom ql(in tql) in flow (#6063)
* feat: support parse prom ql in create flow * refactor * fix: just run tql unmodified * refactor: determine type faster * fix: pass original query * tests: sqlness * test: fix format&chore * fix: get raw query * test: fix sqlness randomness * chore: what's the box for? * test: location_to_index * test: make sqlness more determinstic * fix: tmp add sleep 1s after flush_flow * undo test sleep 1s&rm done todo * chore: more tests
This commit is contained in:
@@ -65,6 +65,7 @@ servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
snafu.workspace = true
|
||||
sql.workspace = true
|
||||
store-api.workspace = true
|
||||
strum.workspace = true
|
||||
substrait.workspace = true
|
||||
|
||||
@@ -312,7 +312,7 @@ impl BatchingEngine {
|
||||
.unwrap_or("None".to_string())
|
||||
);
|
||||
|
||||
let task = BatchingTask::new(
|
||||
let task = BatchingTask::try_new(
|
||||
flow_id,
|
||||
&sql,
|
||||
plan,
|
||||
@@ -323,7 +323,7 @@ impl BatchingEngine {
|
||||
query_ctx,
|
||||
self.catalog_manager.clone(),
|
||||
rx,
|
||||
);
|
||||
)?;
|
||||
|
||||
let task_inner = task.clone();
|
||||
let engine = self.query_engine.clone();
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::ops::Deref;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
@@ -29,6 +28,7 @@ use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
|
||||
use datafusion::optimizer::AnalyzerRule;
|
||||
use datafusion::sql::unparser::expr_to_sql;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_common::DFSchemaRef;
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
@@ -37,6 +37,8 @@ use query::query_engine::DefaultSerializer;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::statement::Statement;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
@@ -68,13 +70,42 @@ use crate::{Error, FlowId};
|
||||
pub struct TaskConfig {
|
||||
pub flow_id: FlowId,
|
||||
pub query: String,
|
||||
plan: Arc<LogicalPlan>,
|
||||
/// output schema of the query
|
||||
pub output_schema: DFSchemaRef,
|
||||
pub time_window_expr: Option<TimeWindowExpr>,
|
||||
/// in seconds
|
||||
pub expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
pub source_table_names: HashSet<[String; 3]>,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_type: QueryType,
|
||||
}
|
||||
|
||||
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
ensure!(
|
||||
stmts.len() == 1,
|
||||
InvalidQuerySnafu {
|
||||
reason: format!("Expect only one statement, found {}", stmts.len())
|
||||
}
|
||||
);
|
||||
let stmt = &stmts[0];
|
||||
match stmt {
|
||||
Statement::Tql(_) => Ok(QueryType::Tql),
|
||||
_ => Ok(QueryType::Sql),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum QueryType {
|
||||
/// query is a tql query
|
||||
Tql,
|
||||
/// query is a sql query
|
||||
Sql,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -85,7 +116,7 @@ pub struct BatchingTask {
|
||||
|
||||
impl BatchingTask {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub fn try_new(
|
||||
flow_id: FlowId,
|
||||
query: &str,
|
||||
plan: LogicalPlan,
|
||||
@@ -96,20 +127,21 @@ impl BatchingTask {
|
||||
query_ctx: QueryContextRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
config: Arc::new(TaskConfig {
|
||||
flow_id,
|
||||
query: query.to_string(),
|
||||
plan: Arc::new(plan),
|
||||
time_window_expr,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
source_table_names: source_table_names.into_iter().collect(),
|
||||
catalog_manager,
|
||||
output_schema: plan.schema().clone(),
|
||||
query_type: determine_query_type(query, &query_ctx)?,
|
||||
}),
|
||||
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
@@ -472,7 +504,7 @@ impl BatchingTask {
|
||||
.unwrap_or(u64::MIN);
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
let schema_len = self.config.plan.schema().fields().len();
|
||||
let schema_len = self.config.output_schema.fields().len();
|
||||
|
||||
let expire_time_window_bound = self
|
||||
.config
|
||||
@@ -481,104 +513,101 @@ impl BatchingTask {
|
||||
.map(|expr| expr.eval(low_bound))
|
||||
.transpose()?;
|
||||
|
||||
let new_plan = {
|
||||
let expr = {
|
||||
match expire_time_window_bound {
|
||||
Some((Some(l), Some(u))) => {
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
let col_name = self
|
||||
.config
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.map(|expr| expr.column_name.clone())
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Flow id={:?}, Failed to get column name from time window expr",
|
||||
self.config.flow_id
|
||||
),
|
||||
})?;
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?
|
||||
}
|
||||
_ => {
|
||||
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
|
||||
debug!(
|
||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
||||
);
|
||||
// clean dirty time window too, this could be from create flow's check_execute
|
||||
self.state.write().unwrap().dirty_time_windows.clean();
|
||||
|
||||
let mut add_auto_column =
|
||||
AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
let plan = self
|
||||
.config
|
||||
.plan
|
||||
.deref()
|
||||
.clone()
|
||||
.rewrite(&mut add_auto_column)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to rewrite plan:\n {}\n",
|
||||
self.config.plan
|
||||
),
|
||||
})?
|
||||
.data;
|
||||
let schema_len = plan.schema().fields().len();
|
||||
|
||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||
return Ok(Some((plan, schema_len)));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (Some((Some(l), Some(u))), QueryType::Sql) =
|
||||
(expire_time_window_bound, &self.config.query_type)
|
||||
else {
|
||||
// either no time window or not a sql query, then just use the original query
|
||||
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
||||
);
|
||||
// clean dirty time window too, this could be from create flow's check_execute
|
||||
self.state.write().unwrap().dirty_time_windows.clean();
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
// TODO(discord9): not add auto column for tql query?
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
|
||||
.await?;
|
||||
let rewrite = plan
|
||||
|
||||
let plan = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.rewrite(&mut add_auto_column)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
// only apply optimize after complex rewrite is done
|
||||
apply_df_optimizer(rewrite).await?
|
||||
let schema_len = plan.schema().fields().len();
|
||||
|
||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||
return Ok(Some((plan, schema_len)));
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}",
|
||||
self.config.flow_id, l, u
|
||||
);
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
let col_name = self
|
||||
.config
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.map(|expr| expr.column_name.clone())
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Flow id={:?}, Failed to get column name from time window expr",
|
||||
self.config.flow_id
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?;
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
|
||||
let plan =
|
||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
|
||||
let rewrite = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
// only apply optimize after complex rewrite is done
|
||||
let new_plan = apply_df_optimizer(rewrite).await?;
|
||||
|
||||
Ok(Some((new_plan, schema_len)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,15 +29,18 @@ use datafusion_common::tree_node::{
|
||||
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
|
||||
use datafusion_expr::{Distinct, LogicalPlan, Projection};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::tql::Tql;
|
||||
use table::metadata::TableInfo;
|
||||
|
||||
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{DatafusionSnafu, ExternalSnafu, TableNotFoundSnafu};
|
||||
use crate::error::{DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, TableNotFoundSnafu};
|
||||
use crate::{Error, TableName};
|
||||
|
||||
pub async fn get_table_info_df_schema(
|
||||
@@ -73,21 +76,57 @@ pub async fn get_table_info_df_schema(
|
||||
}
|
||||
|
||||
/// Convert sql to datafusion logical plan
|
||||
/// Also support TQL (but only Eval not Explain or Analyze)
|
||||
pub async fn sql_to_df_plan(
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
sql: &str,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(sql, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
ensure!(
|
||||
stmts.len() == 1,
|
||||
InvalidQuerySnafu {
|
||||
reason: format!("Expect only one statement, found {}", stmts.len())
|
||||
}
|
||||
);
|
||||
let stmt = &stmts[0];
|
||||
let query_stmt = match stmt {
|
||||
Statement::Tql(tql) => match tql {
|
||||
Tql::Eval(eval) => {
|
||||
let eval = eval.clone();
|
||||
let promql = PromQuery {
|
||||
start: eval.start,
|
||||
end: eval.end,
|
||||
step: eval.step,
|
||||
query: eval.query,
|
||||
lookback: eval
|
||||
.lookback
|
||||
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
};
|
||||
|
||||
QueryLanguageParser::parse_promql(&promql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
}
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: format!("TQL statement {tql:?} is not supported, expect only TQL EVAL"),
|
||||
}
|
||||
.fail()?,
|
||||
},
|
||||
_ => QueryStatement::Sql(stmt.clone()),
|
||||
};
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(&stmt, query_ctx)
|
||||
.plan(&query_stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let plan = if optimize {
|
||||
apply_df_optimizer(plan).await?
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user