mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: make use of the "pre_execute" in sql execution interceptor (#4875)
* feat: dynamic definition of plugin options * rebase * revert * fix ci
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1788,7 +1788,6 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tonic 0.11.0",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -180,13 +180,16 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "5
|
||||
] }
|
||||
strum = { version = "0.25", features = ["derive"] }
|
||||
tempfile = "3"
|
||||
tokio = { version = "1.36", features = ["full"] }
|
||||
tokio = { version = "1.40", features = ["full"] }
|
||||
tokio-postgres = "0.7"
|
||||
tokio-stream = { version = "0.1" }
|
||||
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
|
||||
toml = "0.8.8"
|
||||
tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] }
|
||||
tower = { version = "0.4" }
|
||||
tracing-appender = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
|
||||
typetag = "0.2"
|
||||
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
|
||||
zstd = "0.13"
|
||||
|
||||
|
||||
@@ -45,7 +45,6 @@ common-grpc-expr.workspace = true
|
||||
datanode.workspace = true
|
||||
derive-new = "0.5"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
[dev-dependencies.substrait_proto]
|
||||
package = "substrait"
|
||||
|
||||
@@ -78,7 +78,7 @@ table.workspace = true
|
||||
tokio.workspace = true
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
tracing-appender = "0.2"
|
||||
tracing-appender.workspace = true
|
||||
|
||||
[target.'cfg(not(windows))'.dependencies]
|
||||
tikv-jemallocator = "0.6"
|
||||
|
||||
@@ -174,7 +174,7 @@ impl Repl {
|
||||
|
||||
let plan = query_engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx.clone())
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await
|
||||
.context(PlanStatementSnafu)?;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::{fs, path};
|
||||
|
||||
@@ -250,6 +251,13 @@ pub struct Instance {
|
||||
_guard: Vec<WorkerGuard>,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
/// Find the socket addr of a server by its `name`.
|
||||
pub async fn server_addr(&self, name: &str) -> Option<SocketAddr> {
|
||||
self.frontend.server_handlers().addr(name).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl App for Instance {
|
||||
fn name(&self) -> &str {
|
||||
@@ -340,7 +348,8 @@ pub struct StartCommand {
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
fn load_options(
|
||||
/// Load the GreptimeDB options from various sources (command line, config file or env).
|
||||
pub fn load_options(
|
||||
&self,
|
||||
global_options: &GlobalOptions,
|
||||
) -> Result<GreptimeOptions<StandaloneOptions>> {
|
||||
@@ -430,7 +439,8 @@ impl StartCommand {
|
||||
#[allow(unreachable_code)]
|
||||
#[allow(unused_variables)]
|
||||
#[allow(clippy::diverging_sub_expression)]
|
||||
async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
|
||||
/// Build GreptimeDB instance with the loaded options.
|
||||
pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
|
||||
common_runtime::init_global_runtimes(&opts.runtime);
|
||||
|
||||
let guard = common_telemetry::init_global_logging(
|
||||
|
||||
@@ -60,7 +60,7 @@ table.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-postgres = { workspace = true, optional = true }
|
||||
tonic.workspace = true
|
||||
typetag = "0.2"
|
||||
typetag.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -32,7 +32,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing = "0.1"
|
||||
tracing-appender = "0.2"
|
||||
tracing-appender.workspace = true
|
||||
tracing-log = "0.1"
|
||||
tracing-opentelemetry = "0.22.0"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
@@ -106,7 +106,7 @@ pub async fn sql_to_flow_plan(
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx)
|
||||
.plan(&stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
@@ -278,7 +278,7 @@ mod test {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan = apply_df_optimizer(plan).await.unwrap();
|
||||
@@ -300,7 +300,7 @@ mod test {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan = apply_df_optimizer(plan).await;
|
||||
|
||||
@@ -313,6 +313,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to init plugin"))]
|
||||
// this comment is to bypass the unused snafu check in "check-snafu.py"
|
||||
InitPlugin {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -375,8 +383,9 @@ impl ErrorExt for Error {
|
||||
| Error::ExecLogicalPlan { source, .. } => source.status_code(),
|
||||
|
||||
Error::InvokeRegionServer { source, .. } => source.status_code(),
|
||||
|
||||
Error::External { source, .. } => source.status_code(),
|
||||
Error::External { source, .. } | Error::InitPlugin { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::FindTableRoute { source, .. } => source.status_code(),
|
||||
|
||||
#[cfg(feature = "python")]
|
||||
|
||||
@@ -225,11 +225,45 @@ impl Instance {
|
||||
async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
|
||||
|
||||
let stmt = QueryStatement::Sql(stmt);
|
||||
self.statement_executor
|
||||
.execute_stmt(stmt, query_ctx)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
|
||||
let query_interceptor = query_interceptor.as_ref();
|
||||
|
||||
let output = match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
let stmt = QueryStatement::Sql(stmt);
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
let QueryStatement::Sql(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
|
||||
|
||||
self.statement_executor.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
Statement::Tql(tql) => {
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan_tql(tql.clone(), &query_ctx)
|
||||
.await?;
|
||||
|
||||
query_interceptor.pre_execute(
|
||||
&Statement::Tql(tql),
|
||||
Some(&plan),
|
||||
query_ctx.clone(),
|
||||
)?;
|
||||
|
||||
self.statement_executor.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
_ => {
|
||||
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
|
||||
|
||||
self.statement_executor.execute_sql(stmt, query_ctx).await
|
||||
}
|
||||
};
|
||||
output.context(TableOperationSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,14 +289,6 @@ impl SqlQueryHandler for Instance {
|
||||
Ok(stmts) => {
|
||||
let mut results = Vec::with_capacity(stmts.len());
|
||||
for stmt in stmts {
|
||||
// TODO(sunng87): figure out at which stage we can call
|
||||
// this hook after ArrowFlight adoption. We need to provide
|
||||
// LogicalPlan as to this hook.
|
||||
if let Err(e) = query_interceptor.pre_execute(&stmt, None, query_ctx.clone()) {
|
||||
results.push(Err(e));
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = checker
|
||||
.check_permission(
|
||||
query_ctx.current_user(),
|
||||
@@ -341,7 +367,7 @@ impl SqlQueryHandler for Instance {
|
||||
let plan = self
|
||||
.query_engine
|
||||
.planner()
|
||||
.plan(QueryStatement::Sql(stmt), query_ctx.clone())
|
||||
.plan(&QueryStatement::Sql(stmt), query_ctx.clone())
|
||||
.await
|
||||
.context(PlanStatementSnafu)?;
|
||||
self.query_engine
|
||||
|
||||
@@ -29,4 +29,4 @@ futures = "0.3"
|
||||
meta-srv = { workspace = true, features = ["mock"] }
|
||||
tower.workspace = true
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
@@ -59,7 +59,7 @@ tokio-stream = { workspace = true, features = ["net"] }
|
||||
toml.workspace = true
|
||||
tonic.workspace = true
|
||||
tower.workspace = true
|
||||
typetag = "0.2"
|
||||
typetag.workspace = true
|
||||
url = "2.3"
|
||||
|
||||
[dev-dependencies]
|
||||
@@ -69,4 +69,4 @@ common-meta = { workspace = true, features = ["testing"] }
|
||||
common-procedure-test.workspace = true
|
||||
session.workspace = true
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
@@ -363,7 +363,7 @@ impl StatementExecutor {
|
||||
|
||||
pub async fn plan(
|
||||
&self,
|
||||
stmt: QueryStatement,
|
||||
stmt: &QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<LogicalPlan> {
|
||||
self.query_engine
|
||||
@@ -373,6 +373,14 @@ impl StatementExecutor {
|
||||
.context(PlanStatementSnafu)
|
||||
}
|
||||
|
||||
/// Execute [`LogicalPlan`] directly.
|
||||
pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
self.query_engine
|
||||
.execute(plan, query_ctx)
|
||||
.await
|
||||
.context(ExecLogicalPlanSnafu)
|
||||
}
|
||||
|
||||
pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
self.query_engine
|
||||
.planner()
|
||||
@@ -382,11 +390,8 @@ impl StatementExecutor {
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let plan = self.plan(stmt, query_ctx.clone()).await?;
|
||||
self.query_engine
|
||||
.execute(plan, query_ctx)
|
||||
.await
|
||||
.context(ExecLogicalPlanSnafu)
|
||||
let plan = self.plan(&stmt, query_ctx.clone()).await?;
|
||||
self.exec_plan(plan, query_ctx).await
|
||||
}
|
||||
|
||||
async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
|
||||
|
||||
@@ -391,7 +391,7 @@ impl StatementExecutor {
|
||||
let logical_plan = match &*create_view.query {
|
||||
Statement::Query(query) => {
|
||||
self.plan(
|
||||
QueryStatement::Sql(Statement::Query(query.clone())),
|
||||
&QueryStatement::Sql(Statement::Query(query.clone())),
|
||||
ctx.clone(),
|
||||
)
|
||||
.await?
|
||||
|
||||
@@ -90,7 +90,7 @@ impl StatementExecutor {
|
||||
};
|
||||
self.query_engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx.clone())
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await
|
||||
.context(PlanStatementSnafu)
|
||||
}
|
||||
|
||||
@@ -570,7 +570,7 @@ mod tests {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -592,7 +592,7 @@ mod tests {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -671,7 +671,7 @@ mod tests {
|
||||
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
|
||||
|
||||
#[async_trait]
|
||||
pub trait LogicalPlanner: Send + Sync {
|
||||
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
||||
async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
||||
|
||||
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
|
||||
|
||||
@@ -61,8 +61,8 @@ impl DfLogicalPlanner {
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn plan_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
let df_stmt = (&stmt).try_into().context(SqlSnafu)?;
|
||||
async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
let df_stmt = stmt.try_into().context(SqlSnafu)?;
|
||||
|
||||
let table_provider = DfTableSourceProvider::new(
|
||||
self.engine_state.catalog_manager().clone(),
|
||||
@@ -142,7 +142,7 @@ impl DfLogicalPlanner {
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
let plan_decoder = Arc::new(DefaultPlanDecoder::new(
|
||||
self.session_state.clone(),
|
||||
&query_ctx,
|
||||
@@ -175,7 +175,7 @@ impl DfLogicalPlanner {
|
||||
#[async_trait]
|
||||
impl LogicalPlanner for DfLogicalPlanner {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
|
||||
match stmt {
|
||||
QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await,
|
||||
QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await,
|
||||
|
||||
@@ -157,29 +157,29 @@ pub struct PromPlanner {
|
||||
impl PromPlanner {
|
||||
pub async fn stmt_to_plan(
|
||||
table_provider: DfTableSourceProvider,
|
||||
stmt: EvalStmt,
|
||||
stmt: &EvalStmt,
|
||||
session_state: &SessionState,
|
||||
) -> Result<LogicalPlan> {
|
||||
let mut planner = Self {
|
||||
table_provider,
|
||||
ctx: PromPlannerContext::from_eval_stmt(&stmt),
|
||||
ctx: PromPlannerContext::from_eval_stmt(stmt),
|
||||
};
|
||||
|
||||
planner.prom_expr_to_plan(stmt.expr, session_state).await
|
||||
planner.prom_expr_to_plan(&stmt.expr, session_state).await
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
pub async fn prom_expr_to_plan(
|
||||
&mut self,
|
||||
prom_expr: PromExpr,
|
||||
prom_expr: &PromExpr,
|
||||
session_state: &SessionState,
|
||||
) -> Result<LogicalPlan> {
|
||||
let res = match &prom_expr {
|
||||
let res = match prom_expr {
|
||||
PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?,
|
||||
PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?,
|
||||
PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?,
|
||||
PromExpr::Paren(ParenExpr { expr }) => {
|
||||
self.prom_expr_to_plan(*expr.clone(), session_state).await?
|
||||
self.prom_expr_to_plan(expr, session_state).await?
|
||||
}
|
||||
PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu {
|
||||
name: "Prom Subquery",
|
||||
@@ -212,7 +212,7 @@ impl PromPlanner {
|
||||
modifier,
|
||||
} = aggr_expr;
|
||||
|
||||
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
|
||||
let input = self.prom_expr_to_plan(expr, session_state).await?;
|
||||
|
||||
// calculate columns to group by
|
||||
// Need to append time index column into group by columns
|
||||
@@ -242,7 +242,7 @@ impl PromPlanner {
|
||||
) -> Result<LogicalPlan> {
|
||||
let UnaryExpr { expr } = unary_expr;
|
||||
// Unary Expr in PromQL implys the `-` operator
|
||||
let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?;
|
||||
let input = self.prom_expr_to_plan(expr, session_state).await?;
|
||||
self.projection_for_each_field_column(input, |col| {
|
||||
Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into()))))
|
||||
})
|
||||
@@ -305,7 +305,7 @@ impl PromPlanner {
|
||||
}
|
||||
// lhs is a literal, rhs is a column
|
||||
(Some(mut expr), None) => {
|
||||
let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
|
||||
let input = self.prom_expr_to_plan(rhs, session_state).await?;
|
||||
// check if the literal is a special time expr
|
||||
if let Some(time_expr) = Self::try_build_special_time_expr(
|
||||
lhs,
|
||||
@@ -334,7 +334,7 @@ impl PromPlanner {
|
||||
}
|
||||
// lhs is a column, rhs is a literal
|
||||
(None, Some(mut expr)) => {
|
||||
let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
|
||||
let input = self.prom_expr_to_plan(lhs, session_state).await?;
|
||||
// check if the literal is a special time expr
|
||||
if let Some(time_expr) = Self::try_build_special_time_expr(
|
||||
rhs,
|
||||
@@ -363,14 +363,14 @@ impl PromPlanner {
|
||||
}
|
||||
// both are columns. join them on time index
|
||||
(None, None) => {
|
||||
let left_input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?;
|
||||
let left_input = self.prom_expr_to_plan(lhs, session_state).await?;
|
||||
let left_field_columns = self.ctx.field_columns.clone();
|
||||
let mut left_table_ref = self
|
||||
.table_ref()
|
||||
.unwrap_or_else(|_| TableReference::bare(""));
|
||||
let left_context = self.ctx.clone();
|
||||
|
||||
let right_input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?;
|
||||
let right_input = self.prom_expr_to_plan(rhs, session_state).await?;
|
||||
let right_field_columns = self.ctx.field_columns.clone();
|
||||
let mut right_table_ref = self
|
||||
.table_ref()
|
||||
@@ -589,7 +589,7 @@ impl PromPlanner {
|
||||
// transform function arguments
|
||||
let args = self.create_function_args(&args.args)?;
|
||||
let input = if let Some(prom_expr) = args.input {
|
||||
self.prom_expr_to_plan(prom_expr, session_state).await?
|
||||
self.prom_expr_to_plan(&prom_expr, session_state).await?
|
||||
} else {
|
||||
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
|
||||
self.ctx.reset_table_name_and_schema();
|
||||
@@ -628,9 +628,7 @@ impl PromPlanner {
|
||||
// let promql_parser::parser::ast::Extension { expr } = ext_expr;
|
||||
let expr = &ext_expr.expr;
|
||||
let children = expr.children();
|
||||
let plan = self
|
||||
.prom_expr_to_plan(children[0].clone(), session_state)
|
||||
.await?;
|
||||
let plan = self.prom_expr_to_plan(&children[0], session_state).await?;
|
||||
// Wrapper for the explanation/analyze of the existing plan
|
||||
// https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain
|
||||
// if `analyze` is true, runs the actual plan and produces
|
||||
@@ -1548,7 +1546,7 @@ impl PromPlanner {
|
||||
}
|
||||
})?;
|
||||
let input = args.args[1].as_ref().clone();
|
||||
let input_plan = self.prom_expr_to_plan(input, session_state).await?;
|
||||
let input_plan = self.prom_expr_to_plan(&input, session_state).await?;
|
||||
|
||||
if !self.ctx.has_le_tag() {
|
||||
return ColumnNotFoundSnafu {
|
||||
@@ -1633,9 +1631,7 @@ impl PromPlanner {
|
||||
fn_name: SCALAR_FUNCTION
|
||||
}
|
||||
);
|
||||
let input = self
|
||||
.prom_expr_to_plan(args.args[0].as_ref().clone(), session_state)
|
||||
.await?;
|
||||
let input = self.prom_expr_to_plan(&args.args[0], session_state).await?;
|
||||
ensure!(
|
||||
self.ctx.field_columns.len() == 1,
|
||||
MultiFieldsNotSupportedSnafu {
|
||||
@@ -2420,7 +2416,7 @@ mod test {
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -2630,10 +2626,9 @@ mod test {
|
||||
2,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone(), &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let expected_no_without = String::from(
|
||||
"Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
|
||||
\n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
|
||||
@@ -2661,7 +2656,7 @@ mod test {
|
||||
2,
|
||||
)
|
||||
.await;
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let expected_without = String::from(
|
||||
@@ -2786,7 +2781,7 @@ mod test {
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -2836,7 +2831,7 @@ mod test {
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state())
|
||||
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -3080,13 +3075,10 @@ mod test {
|
||||
3,
|
||||
)
|
||||
.await;
|
||||
let plan = PromPlanner::stmt_to_plan(
|
||||
table_provider,
|
||||
eval_stmt.clone(),
|
||||
&build_session_state(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut fields = plan.schema().field_names();
|
||||
let mut expected = case.1.into_iter().map(String::from).collect::<Vec<_>>();
|
||||
fields.sort();
|
||||
@@ -3108,12 +3100,8 @@ mod test {
|
||||
3,
|
||||
)
|
||||
.await;
|
||||
let plan = PromPlanner::stmt_to_plan(
|
||||
table_provider,
|
||||
eval_stmt.clone(),
|
||||
&build_session_state(),
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
|
||||
assert!(plan.is_err(), "case: {:?}", case);
|
||||
}
|
||||
}
|
||||
@@ -3166,7 +3154,7 @@ mod test {
|
||||
.await;
|
||||
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()).await;
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await;
|
||||
assert!(plan.is_err(), "query: {:?}", query);
|
||||
}
|
||||
}
|
||||
@@ -3224,7 +3212,7 @@ mod test {
|
||||
DummyDecoder::arc(),
|
||||
true,
|
||||
),
|
||||
EvalStmt {
|
||||
&EvalStmt {
|
||||
expr: parser::parse("metrics{tag = \"1\"}").unwrap(),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
@@ -3254,7 +3242,7 @@ mod test {
|
||||
DummyDecoder::arc(),
|
||||
true,
|
||||
),
|
||||
EvalStmt {
|
||||
&EvalStmt {
|
||||
expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
|
||||
@@ -634,7 +634,7 @@ mod test {
|
||||
async fn do_query(sql: &str) -> Result<LogicalPlan> {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
|
||||
let engine = create_test_engine().await;
|
||||
engine.planner().plan(stmt, QueryContext::arc()).await
|
||||
engine.planner().plan(&stmt, QueryContext::arc()).await
|
||||
}
|
||||
|
||||
async fn query_plan_compare(sql: &str, expected: String) {
|
||||
|
||||
@@ -39,7 +39,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).unwrap();
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx.clone())
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let OutputData::Stream(stream) = engine.execute(plan, query_ctx).await.unwrap().data else {
|
||||
|
||||
@@ -134,7 +134,7 @@ async fn test_query_validate() -> Result<()> {
|
||||
.unwrap();
|
||||
assert!(engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
@@ -145,7 +145,7 @@ async fn test_query_validate() -> Result<()> {
|
||||
.unwrap();
|
||||
assert!(engine
|
||||
.planner()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.is_err());
|
||||
Ok(())
|
||||
|
||||
@@ -315,7 +315,7 @@ impl Script for PyScript {
|
||||
let plan = self
|
||||
.query_engine
|
||||
.planner()
|
||||
.plan(stmt, ctx.query_ctx.clone())
|
||||
.plan(&stmt, ctx.query_ctx.clone())
|
||||
.await
|
||||
.context(DatabaseQuerySnafu)?;
|
||||
let res = self
|
||||
|
||||
@@ -414,7 +414,7 @@ impl PyQueryEngine {
|
||||
let ctx = Arc::new(QueryContextBuilder::default().build());
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, ctx.clone())
|
||||
.plan(&stmt, ctx.clone())
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
let res = engine
|
||||
|
||||
@@ -70,7 +70,7 @@ impl SqlQueryHandler for DummyInstance {
|
||||
let plan = self
|
||||
.query_engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx.clone())
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let output = self.query_engine.execute(plan, query_ctx).await.unwrap();
|
||||
@@ -98,7 +98,7 @@ impl SqlQueryHandler for DummyInstance {
|
||||
let plan = self
|
||||
.query_engine
|
||||
.planner()
|
||||
.plan(QueryStatement::Sql(stmt), query_ctx.clone())
|
||||
.plan(&QueryStatement::Sql(stmt), query_ctx.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = self.query_engine.describe(plan, query_ctx).await.unwrap();
|
||||
|
||||
@@ -542,7 +542,7 @@ CREATE TABLE {table_name} (
|
||||
let plan = instance
|
||||
.frontend()
|
||||
.statement_executor()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan = DFLogicalSubstraitConvertor
|
||||
|
||||
@@ -236,7 +236,7 @@ mod tests {
|
||||
let plan = instance
|
||||
.frontend()
|
||||
.statement_executor()
|
||||
.plan(stmt, QueryContext::arc())
|
||||
.plan(&stmt, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
let plan = DFLogicalSubstraitConvertor
|
||||
|
||||
Reference in New Issue
Block a user