refactor: query config options (#6781)

* feat: refactor columnar and vector conversion

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: initialize config options from query context

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: failure tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: revert ColumnarValue::try_from_vector

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
dennis zhuang
2025-09-01 15:00:26 +08:00
committed by GitHub
parent d57c0db9e6
commit 1234911ed3
16 changed files with 103 additions and 69 deletions

1
Cargo.lock generated
View File

@@ -11635,6 +11635,7 @@ dependencies = [
"common-session",
"common-telemetry",
"common-time",
"datafusion-common",
"derive_builder 0.20.2",
"derive_more",
"snafu 0.8.6",

View File

@@ -34,6 +34,33 @@ pub struct ClampFunction;
const CLAMP_NAME: &str = "clamp";
/// Ensure the vector is constant and not empty (i.e., all values are identical)
fn ensure_constant_vector(vector: &VectorRef) -> Result<()> {
ensure!(
!vector.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "Expect at least one value",
}
);
if vector.is_const() {
return Ok(());
}
let first = vector.get_ref(0);
for i in 1..vector.len() {
let v = vector.get_ref(i);
if first != v {
return InvalidFuncArgsSnafu {
err_msg: "All values in min/max argument must be identical",
}
.fail();
}
}
Ok(())
}
impl Function for ClampFunction {
fn name(&self) -> &str {
CLAMP_NAME
@@ -80,16 +107,9 @@ impl Function for ClampFunction {
),
}
);
ensure!(
(columns[1].len() == 1 || columns[1].is_const())
&& (columns[2].len() == 1 || columns[2].is_const()),
InvalidFuncArgsSnafu {
err_msg: format!(
"The second and third args should be scalar, have: {:?}, {:?}",
columns[1], columns[2]
),
}
);
ensure_constant_vector(&columns[1])?;
ensure_constant_vector(&columns[2])?;
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$S| {
let input_array = columns[0].to_arrow_array();
@@ -204,15 +224,8 @@ impl Function for ClampMinFunction {
),
}
);
ensure!(
columns[1].len() == 1 || columns[1].is_const(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The second arg (min) should be scalar, have: {:?}",
columns[1]
),
}
);
ensure_constant_vector(&columns[1])?;
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$S| {
let input_array = columns[0].to_arrow_array();
@@ -292,15 +305,8 @@ impl Function for ClampMaxFunction {
),
}
);
ensure!(
columns[1].len() == 1 || columns[1].is_const(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The second arg (max) should be scalar, have: {:?}",
columns[1]
),
}
);
ensure_constant_vector(&columns[1])?;
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$S| {
let input_array = columns[0].to_arrow_array();
@@ -537,8 +543,8 @@ mod test {
let func = ClampFunction;
let args = [
Arc::new(Float64Vector::from(input)) as _,
Arc::new(Float64Vector::from_vec(vec![min, min])) as _,
Arc::new(Float64Vector::from_vec(vec![max])) as _,
Arc::new(Float64Vector::from_vec(vec![min, max])) as _,
Arc::new(Float64Vector::from_vec(vec![max, min])) as _,
];
let result = func.eval(&FunctionContext::default(), args.as_slice());
assert!(result.is_err());

View File

@@ -16,15 +16,12 @@ use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use common_query::error::FromScalarValueSnafu;
use common_query::prelude::ColumnarValue;
use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl};
use datafusion_expr::ScalarUDF;
use datatypes::data_type::DataType;
use datatypes::prelude::*;
use datatypes::vectors::Helper;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::function::{FunctionContext, FunctionRef};
use crate::state::FunctionState;
@@ -76,13 +73,7 @@ impl ScalarUDFImpl for ScalarUdf {
let columns = args
.args
.iter()
.map(|x| {
ColumnarValue::try_from(x).and_then(|y| match y {
ColumnarValue::Vector(z) => Ok(z),
ColumnarValue::Scalar(z) => Helper::try_from_scalar_value(z, args.number_rows)
.context(FromScalarValueSnafu),
})
})
.map(|x| ColumnarValue::try_from(x).and_then(|y| y.try_into_vector(args.number_rows)))
.collect::<common_query::error::Result<Vec<_>>>()?;
let v = self
.function

View File

@@ -128,6 +128,10 @@ impl Helper {
ScalarValue::Boolean(v) => {
ConstantVector::new(Arc::new(BooleanVector::from(vec![v])), length)
}
ScalarValue::Float16(v) => ConstantVector::new(
Arc::new(Float32Vector::from(vec![v.map(f32::from)])),
length,
),
ScalarValue::Float32(v) => {
ConstantVector::new(Arc::new(Float32Vector::from(vec![v])), length)
}
@@ -243,7 +247,6 @@ impl Helper {
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _)
| ScalarValue::Float16(_)
| ScalarValue::Utf8View(_)
| ScalarValue::BinaryView(_)
| ScalarValue::Map(_)

View File

@@ -714,7 +714,7 @@ impl BatchingTask {
})?
.data;
// only apply optimize after complex rewrite is done
let new_plan = apply_df_optimizer(rewrite).await?;
let new_plan = apply_df_optimizer(rewrite, &query_ctx).await?;
let info = PlanInfo {
plan: new_plan.clone(),

View File

@@ -122,13 +122,13 @@ pub async fn sql_to_df_plan(
};
let plan = engine
.planner()
.plan(&query_stmt, query_ctx)
.plan(&query_stmt, query_ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = if optimize {
apply_df_optimizer(plan).await?
apply_df_optimizer(plan, &query_ctx).await?
} else {
plan
};

View File

@@ -44,6 +44,7 @@ use query::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use session::context::QueryContextRef;
use snafu::ResultExt;
/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
/// rename it to `substrait_proto`
@@ -57,8 +58,9 @@ use crate::plan::TypedPlan;
// TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed
pub async fn apply_df_optimizer(
plan: datafusion_expr::LogicalPlan,
query_ctx: &QueryContextRef,
) -> Result<datafusion_expr::LogicalPlan, Error> {
let cfg = ConfigOptions::new();
let cfg = query_ctx.create_config_options();
let analyzer = Analyzer::with_rules(vec![
Arc::new(CountWildcardToTimeIndexRule),
Arc::new(AvgExpandRule),
@@ -107,12 +109,12 @@ pub async fn sql_to_flow_plan(
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.plan(&stmt, query_ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let opted_plan = apply_df_optimizer(plan).await?;
let opted_plan = apply_df_optimizer(plan, &query_ctx).await?;
// TODO(discord9): add df optimization
let sub_plan = DFLogicalSubstraitConvertor {}

View File

@@ -172,7 +172,9 @@ pub async fn sql_to_substrait(engine: Arc<dyn QueryEngine>, sql: &str) -> proto:
.plan(&stmt, QueryContext::arc())
.await
.unwrap();
let plan = apply_df_optimizer(plan).await.unwrap();
let plan = apply_df_optimizer(plan, &QueryContext::arc())
.await
.unwrap();
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
let bytes = DFLogicalSubstraitConvertor {}

View File

@@ -293,7 +293,9 @@ mod test {
.plan(&stmt, QueryContext::arc())
.await
.unwrap();
let plan = apply_df_optimizer(plan).await.unwrap();
let plan = apply_df_optimizer(plan, &QueryContext::arc())
.await
.unwrap();
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
let bytes = DFLogicalSubstraitConvertor {}
@@ -315,7 +317,7 @@ mod test {
.plan(&stmt, QueryContext::arc())
.await
.unwrap();
let plan = apply_df_optimizer(plan).await;
let plan = apply_df_optimizer(plan, &QueryContext::arc()).await;
assert!(plan.is_err());
}

View File

@@ -19,6 +19,7 @@ use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::Error as QueryResult;
use datafusion::parquet;
use datafusion_common::DataFusionError;
use datatypes::arrow::error::ArrowError;
@@ -36,6 +37,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to cast result: `{}`", source))]
Cast {
#[snafu(source)]
source: QueryResult,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("View already exists: `{name}`"))]
ViewAlreadyExists {
name: String,
@@ -870,6 +879,7 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::Cast { source, .. } => source.status_code(),
Error::InvalidSql { .. }
| Error::InvalidConfigValue { .. }
| Error::InvalidInsertRequest { .. }

View File

@@ -32,7 +32,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
use sql::statements::admin::Admin;
use crate::error::{self, ExecuteAdminFunctionSnafu, IntoVectorsSnafu, Result};
use crate::error::{self, CastSnafu, ExecuteAdminFunctionSnafu, Result};
use crate::statement::StatementExecutor;
const DUMMY_COLUMN: &str = "<dummy>";
@@ -118,7 +118,7 @@ impl StatementExecutor {
.collect(),
return_field: Arc::new(arrow::datatypes::Field::new("result", ret_type, true)),
number_rows: if args.is_empty() { 1 } else { args[0].len() },
config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
config_options: Arc::new(query_ctx.create_config_options()),
};
// Execute the async UDF
@@ -134,22 +134,11 @@ impl StatementExecutor {
})?;
// Convert result back to VectorRef
let result = match result_columnar {
datafusion_expr::ColumnarValue::Array(array) => {
datatypes::vectors::Helper::try_into_vector(array).context(IntoVectorsSnafu)?
}
datafusion_expr::ColumnarValue::Scalar(scalar) => {
let array =
scalar
.to_array_of_size(1)
.with_context(|_| ExecuteAdminFunctionSnafu {
msg: format!("Failed to convert scalar to array for {}", fn_name),
})?;
datatypes::vectors::Helper::try_into_vector(array).context(IntoVectorsSnafu)?
}
};
let result_columnar: common_query::prelude::ColumnarValue =
(&result_columnar).try_into().context(CastSnafu)?;
let result_vector: VectorRef = result_columnar.try_into_vector(1).context(CastSnafu)?;
let result_vector: VectorRef = result;
let column_schemas = vec![ColumnSchema::new(
// Use statement as the result column name
stmt.to_string(),

View File

@@ -537,6 +537,9 @@ impl QueryEngine for DatafusionQueryEngine {
}
}
// configure execution options
state.config_mut().options_mut().execution.time_zone = query_ctx.timezone().to_string();
// usually it's impossible to have both `set variable` set by sql client and
// hint in header by grpc client, so only need to deal with them separately
if query_ctx.configuration_parameter().allow_query_fallback() {

View File

@@ -22,6 +22,7 @@ common-recordbatch.workspace = true
common-session.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
derive_builder.workspace = true
derive_more = { version = "1", default-features = false, features = ["debug"] }
snafu.workspace = true

View File

@@ -28,6 +28,7 @@ use common_recordbatch::cursor::RecordBatchStreamCursor;
use common_telemetry::warn;
use common_time::timezone::parse_timezone;
use common_time::Timezone;
use datafusion_common::config::ConfigOptions;
use derive_builder::Builder;
use sql::dialect::{Dialect, GenericDialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect};
@@ -221,6 +222,13 @@ impl QueryContext {
Arc::new(QueryContextBuilder::default().build())
}
/// Create a new datafusion's ConfigOptions instance based on the current QueryContext.
pub fn create_config_options(&self) -> ConfigOptions {
let mut config = ConfigOptions::default();
config.execution.time_zone = self.timezone().to_string();
config
}
pub fn with(catalog: &str, schema: &str) -> QueryContext {
QueryContextBuilder::default()
.current_catalog(catalog.to_string())

View File

@@ -76,7 +76,13 @@ SELECT CLAMP(0.5, 0, 1);
SELECT CLAMP(10, 1, 0);
Error: 3001(EngineExecuteQuery), Invalid function args: The second arg should be less than or equal to the third arg, have: ConstantVector([Int64(1); 1]), ConstantVector([Int64(0); 1])
Error: 3001(EngineExecuteQuery), Invalid function args: The second arg should be less than or equal to the third arg, have: PrimitiveVector { array: PrimitiveArray<Int64>
[
1,
] }, PrimitiveVector { array: PrimitiveArray<Int64>
[
0,
] }
SELECT CLAMP_MIN(10, 12);

View File

@@ -375,7 +375,17 @@ TQL EVAL (0, 15, '5s') clamp(host, 6 - 6, 6 + 6);
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') clamp(host, 12, 0);
Error: 3001(EngineExecuteQuery), Invalid function args: The second arg should be less than or equal to the third arg, have: ConstantVector([Float64(12.0); 3]), ConstantVector([Float64(0.0); 3])
Error: 3001(EngineExecuteQuery), Invalid function args: The second arg should be less than or equal to the third arg, have: PrimitiveVector { array: PrimitiveArray<Float64>
[
12.0,
0.0,
0.0,
0.0,
12.0,
12.0,
[
] }, PrimitiveVector { array: PrimitiveArray<Float64>
] }
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') clamp(host{host="host1"}, -1, 6);