mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 12:00:40 +00:00
fix: mysql prepare limit&offset param (#5734)
* fix: prepare limit&offset param * test: sqlness * chore: per review * chore: per review
This commit is contained in:
@@ -47,7 +47,7 @@ use tokio::io::AsyncWrite;
|
||||
use crate::error::{self, DataFrameSnafu, InvalidPrepareStatementSnafu, Result};
|
||||
use crate::metrics::METRIC_AUTH_FAILURE;
|
||||
use crate::mysql::helper::{
|
||||
self, format_placeholder, replace_placeholders, transform_placeholders,
|
||||
self, fix_placeholder_types, format_placeholder, replace_placeholders, transform_placeholders,
|
||||
};
|
||||
use crate::mysql::writer;
|
||||
use crate::mysql::writer::{create_mysql_column, handle_err};
|
||||
@@ -183,7 +183,7 @@ impl MysqlInstanceShim {
|
||||
let describe_result = self
|
||||
.do_describe(statement.clone(), query_ctx.clone())
|
||||
.await?;
|
||||
let (plan, schema) = if let Some(DescribeResult {
|
||||
let (mut plan, schema) = if let Some(DescribeResult {
|
||||
logical_plan,
|
||||
schema,
|
||||
}) = describe_result
|
||||
@@ -193,7 +193,8 @@ impl MysqlInstanceShim {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let params = if let Some(plan) = &plan {
|
||||
let params = if let Some(plan) = &mut plan {
|
||||
fix_placeholder_types(plan)?;
|
||||
prepared_params(
|
||||
&plan
|
||||
.get_parameter_types()
|
||||
@@ -258,7 +259,8 @@ impl MysqlInstanceShim {
|
||||
};
|
||||
|
||||
let outputs = match sql_plan.plan {
|
||||
Some(plan) => {
|
||||
Some(mut plan) => {
|
||||
fix_placeholder_types(&mut plan)?;
|
||||
let param_types = plan
|
||||
.get_parameter_types()
|
||||
.context(DataFrameSnafu)?
|
||||
@@ -295,6 +297,10 @@ impl MysqlInstanceShim {
|
||||
}
|
||||
Params::CliParams(params) => params.iter().map(|x| x.to_string()).collect(),
|
||||
};
|
||||
debug!(
|
||||
"do_execute Replacing with Params: {:?}, Original Query: {}",
|
||||
param_strs, sql_plan.query
|
||||
);
|
||||
let query = replace_params(param_strs, sql_plan.query);
|
||||
debug!("Mysql execute replaced query: {}", query);
|
||||
self.do_query(&query, query_ctx.clone()).await
|
||||
@@ -412,6 +418,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
let (params, columns) = self
|
||||
.do_prepare(raw_query, query_ctx.clone(), stmt_key)
|
||||
.await?;
|
||||
debug!("on_prepare: Params: {:?}, Columns: {:?}", params, columns);
|
||||
w.reply(stmt_id, ¶ms, &columns).await?;
|
||||
crate::metrics::METRIC_MYSQL_PREPARED_COUNT
|
||||
.with_label_values(&[query_ctx.get_db_string().as_str()])
|
||||
@@ -641,12 +648,13 @@ fn replace_params_with_values(
|
||||
debug_assert_eq!(param_types.len(), params.len());
|
||||
|
||||
debug!(
|
||||
"replace_params_with_values(param_types: {:#?}, params: {:#?})",
|
||||
"replace_params_with_values(param_types: {:#?}, params: {:#?}, plan: {:#?})",
|
||||
param_types,
|
||||
params
|
||||
.iter()
|
||||
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
|
||||
.join(", ")
|
||||
.join(", "),
|
||||
plan
|
||||
);
|
||||
|
||||
let mut values = Vec::with_capacity(params.len());
|
||||
@@ -672,9 +680,10 @@ fn replace_params_with_exprs(
|
||||
debug_assert_eq!(param_types.len(), params.len());
|
||||
|
||||
debug!(
|
||||
"replace_params_with_exprs(param_types: {:#?}, params: {:#?})",
|
||||
"replace_params_with_exprs(param_types: {:#?}, params: {:#?}, plan: {:#?})",
|
||||
param_types,
|
||||
params.iter().map(|x| format!("({:?})", x)).join(", ")
|
||||
params.iter().map(|x| format!("({:?})", x)).join(", "),
|
||||
plan
|
||||
);
|
||||
|
||||
let mut values = Vec::with_capacity(params.len());
|
||||
|
||||
@@ -18,6 +18,8 @@ use std::time::Duration;
|
||||
use chrono::NaiveDate;
|
||||
use common_query::prelude::ScalarValue;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::value::{self, Value};
|
||||
@@ -28,7 +30,7 @@ use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::error::{self, DataFusionSnafu, Result};
|
||||
|
||||
/// Returns the placeholder string "$i".
|
||||
pub fn format_placeholder(i: usize) -> String {
|
||||
@@ -77,6 +79,47 @@ pub fn transform_placeholders(stmt: Statement) -> Statement {
|
||||
}
|
||||
}
|
||||
|
||||
/// Give placeholder in skip and limit `int64` data type if it is not specified
|
||||
///
|
||||
/// because it seems datafusion will not give data type to placeholder if it's in limit/skip position, still unknown if this is a feature or a bug. And if a placeholder expr have no data type, datafusion will fail to extract it using `LogicalPlan::get_parameter_types`
|
||||
pub fn fix_placeholder_types(plan: &mut LogicalPlan) -> Result<()> {
|
||||
let give_placeholder_types = |mut e| {
|
||||
if let datafusion_expr::Expr::Placeholder(ph) = &mut e {
|
||||
if ph.data_type.is_none() {
|
||||
ph.data_type = Some(arrow_schema::DataType::Int64);
|
||||
Ok(Transformed::yes(e))
|
||||
} else {
|
||||
Ok(Transformed::no(e))
|
||||
}
|
||||
} else {
|
||||
Ok(Transformed::no(e))
|
||||
}
|
||||
};
|
||||
*plan = std::mem::take(plan)
|
||||
.transform(|p| {
|
||||
let LogicalPlan::Limit(mut limit) = p else {
|
||||
return Ok(Transformed::no(p));
|
||||
};
|
||||
|
||||
if let Some(fetch) = &mut limit.fetch {
|
||||
*fetch = Box::new(
|
||||
std::mem::take(fetch)
|
||||
.transform(give_placeholder_types)?
|
||||
.data,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(skip) = &mut limit.skip {
|
||||
*skip = Box::new(std::mem::take(skip).transform(give_placeholder_types)?.data);
|
||||
}
|
||||
|
||||
Ok(Transformed::yes(LogicalPlan::Limit(limit)))
|
||||
})
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn visit_placeholders<V>(v: &mut V)
|
||||
where
|
||||
V: VisitMut,
|
||||
|
||||
@@ -70,3 +70,62 @@ DEALLOCATE stmt;
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
-- test if placeholder at limit and offset are parsed correctly
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
PREPARE stmt FROM 'SELECT 1 LIMIT ? OFFSET ?;';
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
EXECUTE stmt USING 1, 2;
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DEALLOCATE stmt;
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
-- test with data
|
||||
CREATE TABLE IF NOT EXISTS "cake" (
|
||||
`domain` STRING,
|
||||
is_expire BOOLEAN NULL,
|
||||
ts TIMESTAMP(3),
|
||||
TIME INDEX ("ts"),
|
||||
PRIMARY KEY ("domain")
|
||||
) ENGINE=mito
|
||||
WITH(
|
||||
append_mode = 'true',
|
||||
ttl='7days'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO cake(domain, is_expire, ts) VALUES('happy', false, '2025-03-18 12:55:51.758000');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
PREPARE stmt FROM 'SELECT `cake`.`domain`, `cake`.`is_expire`, `cake`.`ts` FROM `cake` WHERE `cake`.`domain` = ? LIMIT ? OFFSET ?';
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
EXECUTE stmt USING 'happy', 42, 0;
|
||||
|
||||
+--------+-----------+----------------------------+
|
||||
| domain | is_expire | ts |
|
||||
+--------+-----------+----------------------------+
|
||||
| happy | 0 | 2025-03-18 12:55:51.758000 |
|
||||
+--------+-----------+----------------------------+
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DEALLOCATE stmt;
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DROP TABLE cake;
|
||||
|
||||
affected_rows: 0
|
||||
|
||||
|
||||
@@ -37,3 +37,40 @@ EXECUTE stmt USING 1, 'hello';
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DEALLOCATE stmt;
|
||||
|
||||
-- test if placeholder at limit and offset are parsed correctly
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
PREPARE stmt FROM 'SELECT 1 LIMIT ? OFFSET ?;';
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
EXECUTE stmt USING 1, 2;
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DEALLOCATE stmt;
|
||||
|
||||
-- test with data
|
||||
CREATE TABLE IF NOT EXISTS "cake" (
|
||||
`domain` STRING,
|
||||
is_expire BOOLEAN NULL,
|
||||
ts TIMESTAMP(3),
|
||||
TIME INDEX ("ts"),
|
||||
PRIMARY KEY ("domain")
|
||||
) ENGINE=mito
|
||||
WITH(
|
||||
append_mode = 'true',
|
||||
ttl='7days'
|
||||
);
|
||||
|
||||
INSERT INTO cake(domain, is_expire, ts) VALUES('happy', false, '2025-03-18 12:55:51.758000');
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
PREPARE stmt FROM 'SELECT `cake`.`domain`, `cake`.`is_expire`, `cake`.`ts` FROM `cake` WHERE `cake`.`domain` = ? LIMIT ? OFFSET ?';
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
EXECUTE stmt USING 'happy', 42, 0;
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DEALLOCATE stmt;
|
||||
|
||||
-- SQLNESS PROTOCOL MYSQL
|
||||
DROP TABLE cake;
|
||||
|
||||
Reference in New Issue
Block a user