diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 49dadc545a..f8dec09114 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -47,7 +47,7 @@ use tokio::io::AsyncWrite; use crate::error::{self, DataFrameSnafu, InvalidPrepareStatementSnafu, Result}; use crate::metrics::METRIC_AUTH_FAILURE; use crate::mysql::helper::{ - self, format_placeholder, replace_placeholders, transform_placeholders, + self, fix_placeholder_types, format_placeholder, replace_placeholders, transform_placeholders, }; use crate::mysql::writer; use crate::mysql::writer::{create_mysql_column, handle_err}; @@ -183,7 +183,7 @@ impl MysqlInstanceShim { let describe_result = self .do_describe(statement.clone(), query_ctx.clone()) .await?; - let (plan, schema) = if let Some(DescribeResult { + let (mut plan, schema) = if let Some(DescribeResult { logical_plan, schema, }) = describe_result @@ -193,7 +193,8 @@ impl MysqlInstanceShim { (None, None) }; - let params = if let Some(plan) = &plan { + let params = if let Some(plan) = &mut plan { + fix_placeholder_types(plan)?; prepared_params( &plan .get_parameter_types() @@ -258,7 +259,8 @@ impl MysqlInstanceShim { }; let outputs = match sql_plan.plan { - Some(plan) => { + Some(mut plan) => { + fix_placeholder_types(&mut plan)?; let param_types = plan .get_parameter_types() .context(DataFrameSnafu)? @@ -295,6 +297,10 @@ impl MysqlInstanceShim { } Params::CliParams(params) => params.iter().map(|x| x.to_string()).collect(), }; + debug!( + "do_execute Replacing with Params: {:?}, Original Query: {}", + param_strs, sql_plan.query + ); let query = replace_params(param_strs, sql_plan.query); debug!("Mysql execute replaced query: {}", query); self.do_query(&query, query_ctx.clone()).await @@ -412,6 +418,7 @@ impl AsyncMysqlShim for MysqlInstanceShi let (params, columns) = self .do_prepare(raw_query, query_ctx.clone(), stmt_key) .await?; + debug!("on_prepare: Params: {:?}, Columns: {:?}", params, columns); w.reply(stmt_id, ¶ms, &columns).await?; crate::metrics::METRIC_MYSQL_PREPARED_COUNT .with_label_values(&[query_ctx.get_db_string().as_str()]) @@ -641,12 +648,13 @@ fn replace_params_with_values( debug_assert_eq!(param_types.len(), params.len()); debug!( - "replace_params_with_values(param_types: {:#?}, params: {:#?})", + "replace_params_with_values(param_types: {:#?}, params: {:#?}, plan: {:#?})", param_types, params .iter() .map(|x| format!("({:?}, {:?})", x.value, x.coltype)) - .join(", ") + .join(", "), + plan ); let mut values = Vec::with_capacity(params.len()); @@ -672,9 +680,10 @@ fn replace_params_with_exprs( debug_assert_eq!(param_types.len(), params.len()); debug!( - "replace_params_with_exprs(param_types: {:#?}, params: {:#?})", + "replace_params_with_exprs(param_types: {:#?}, params: {:#?}, plan: {:#?})", param_types, - params.iter().map(|x| format!("({:?})", x)).join(", ") + params.iter().map(|x| format!("({:?})", x)).join(", "), + plan ); let mut values = Vec::with_capacity(params.len()); diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs index 2e719a83c2..bf2d5688d0 100644 --- a/src/servers/src/mysql/helper.rs +++ b/src/servers/src/mysql/helper.rs @@ -18,6 +18,8 @@ use std::time::Duration; use chrono::NaiveDate; use common_query::prelude::ScalarValue; use common_time::Timestamp; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; use datatypes::types::TimestampType; use datatypes::value::{self, Value}; @@ -28,7 +30,7 @@ use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut}; use sql::statements::sql_value_to_value; use sql::statements::statement::Statement; -use crate::error::{self, Result}; +use crate::error::{self, DataFusionSnafu, Result}; /// Returns the placeholder string "$i". pub fn format_placeholder(i: usize) -> String { @@ -77,6 +79,47 @@ pub fn transform_placeholders(stmt: Statement) -> Statement { } } +/// Give placeholder in skip and limit `int64` data type if it is not specified +/// +/// because it seems datafusion will not give data type to placeholder if it's in limit/skip position, still unknown if this is a feature or a bug. And if a placeholder expr have no data type, datafusion will fail to extract it using `LogicalPlan::get_parameter_types` +pub fn fix_placeholder_types(plan: &mut LogicalPlan) -> Result<()> { + let give_placeholder_types = |mut e| { + if let datafusion_expr::Expr::Placeholder(ph) = &mut e { + if ph.data_type.is_none() { + ph.data_type = Some(arrow_schema::DataType::Int64); + Ok(Transformed::yes(e)) + } else { + Ok(Transformed::no(e)) + } + } else { + Ok(Transformed::no(e)) + } + }; + *plan = std::mem::take(plan) + .transform(|p| { + let LogicalPlan::Limit(mut limit) = p else { + return Ok(Transformed::no(p)); + }; + + if let Some(fetch) = &mut limit.fetch { + *fetch = Box::new( + std::mem::take(fetch) + .transform(give_placeholder_types)? + .data, + ); + } + + if let Some(skip) = &mut limit.skip { + *skip = Box::new(std::mem::take(skip).transform(give_placeholder_types)?.data); + } + + Ok(Transformed::yes(LogicalPlan::Limit(limit))) + }) + .context(DataFusionSnafu)? + .data; + Ok(()) +} + fn visit_placeholders(v: &mut V) where V: VisitMut, diff --git a/tests/cases/standalone/common/prepare/mysql_prepare.result b/tests/cases/standalone/common/prepare/mysql_prepare.result index 31fa6a9c9a..eb227ff0d6 100644 --- a/tests/cases/standalone/common/prepare/mysql_prepare.result +++ b/tests/cases/standalone/common/prepare/mysql_prepare.result @@ -70,3 +70,62 @@ DEALLOCATE stmt; affected_rows: 0 +-- test if placeholder at limit and offset are parsed correctly +-- SQLNESS PROTOCOL MYSQL +PREPARE stmt FROM 'SELECT 1 LIMIT ? OFFSET ?;'; + +affected_rows: 0 + +-- SQLNESS PROTOCOL MYSQL +EXECUTE stmt USING 1, 2; + +affected_rows: 0 + +-- SQLNESS PROTOCOL MYSQL +DEALLOCATE stmt; + +affected_rows: 0 + +-- test with data +CREATE TABLE IF NOT EXISTS "cake" ( + `domain` STRING, + is_expire BOOLEAN NULL, + ts TIMESTAMP(3), + TIME INDEX ("ts"), + PRIMARY KEY ("domain") +) ENGINE=mito +WITH( + append_mode = 'true', + ttl='7days' +); + +Affected Rows: 0 + +INSERT INTO cake(domain, is_expire, ts) VALUES('happy', false, '2025-03-18 12:55:51.758000'); + +Affected Rows: 1 + +-- SQLNESS PROTOCOL MYSQL +PREPARE stmt FROM 'SELECT `cake`.`domain`, `cake`.`is_expire`, `cake`.`ts` FROM `cake` WHERE `cake`.`domain` = ? LIMIT ? OFFSET ?'; + +affected_rows: 0 + +-- SQLNESS PROTOCOL MYSQL +EXECUTE stmt USING 'happy', 42, 0; + ++--------+-----------+----------------------------+ +| domain | is_expire | ts | ++--------+-----------+----------------------------+ +| happy | 0 | 2025-03-18 12:55:51.758000 | ++--------+-----------+----------------------------+ + +-- SQLNESS PROTOCOL MYSQL +DEALLOCATE stmt; + +affected_rows: 0 + +-- SQLNESS PROTOCOL MYSQL +DROP TABLE cake; + +affected_rows: 0 + diff --git a/tests/cases/standalone/common/prepare/mysql_prepare.sql b/tests/cases/standalone/common/prepare/mysql_prepare.sql index da1681a790..8e80a0a867 100644 --- a/tests/cases/standalone/common/prepare/mysql_prepare.sql +++ b/tests/cases/standalone/common/prepare/mysql_prepare.sql @@ -37,3 +37,40 @@ EXECUTE stmt USING 1, 'hello'; -- SQLNESS PROTOCOL MYSQL DEALLOCATE stmt; + +-- test if placeholder at limit and offset are parsed correctly +-- SQLNESS PROTOCOL MYSQL +PREPARE stmt FROM 'SELECT 1 LIMIT ? OFFSET ?;'; + +-- SQLNESS PROTOCOL MYSQL +EXECUTE stmt USING 1, 2; + +-- SQLNESS PROTOCOL MYSQL +DEALLOCATE stmt; + +-- test with data +CREATE TABLE IF NOT EXISTS "cake" ( + `domain` STRING, + is_expire BOOLEAN NULL, + ts TIMESTAMP(3), + TIME INDEX ("ts"), + PRIMARY KEY ("domain") +) ENGINE=mito +WITH( + append_mode = 'true', + ttl='7days' +); + +INSERT INTO cake(domain, is_expire, ts) VALUES('happy', false, '2025-03-18 12:55:51.758000'); + +-- SQLNESS PROTOCOL MYSQL +PREPARE stmt FROM 'SELECT `cake`.`domain`, `cake`.`is_expire`, `cake`.`ts` FROM `cake` WHERE `cake`.`domain` = ? LIMIT ? OFFSET ?'; + +-- SQLNESS PROTOCOL MYSQL +EXECUTE stmt USING 'happy', 42, 0; + +-- SQLNESS PROTOCOL MYSQL +DEALLOCATE stmt; + +-- SQLNESS PROTOCOL MYSQL +DROP TABLE cake;