mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 06:20:38 +00:00
fix: correctly decode mysql timestamp binary value (#2946)
* fix: correctly decode mysql timestamp binary value * fix: ut * fix: resolve PR comments
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -5477,8 +5477,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "opensrv-mysql"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6b6a785aafb26a97c26078b9457e96cb238b386781583783a3a3d3de47fa841"
|
||||
source = "git+https://github.com/MichaelScofield/opensrv.git?rev=1676c1d#1676c1d166cf33e7745e1b5db54b3fe2b7defec9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
@@ -54,7 +54,8 @@ lazy_static.workspace = true
|
||||
mime_guess = "2.0"
|
||||
once_cell.workspace = true
|
||||
openmetrics-parser = "0.4"
|
||||
opensrv-mysql = "0.6"
|
||||
# TODO(LFC): Wait for https://github.com/datafuselabs/opensrv/pull/60
|
||||
opensrv-mysql = { git = "https://github.com/MichaelScofield/opensrv.git", rev = "1676c1d" }
|
||||
opentelemetry-proto.workspace = true
|
||||
parking_lot = "0.12"
|
||||
pgwire = "0.17"
|
||||
|
||||
@@ -408,6 +408,9 @@ pub enum Error {
|
||||
error: FromUtf8Error,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert Mysql value, error: {}", err_msg))]
|
||||
MysqlValueConversion { err_msg: String, location: Location },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -460,7 +463,8 @@ impl ErrorExt for Error {
|
||||
| PreparedStmtTypeMismatch { .. }
|
||||
| TimePrecision { .. }
|
||||
| UrlDecode { .. }
|
||||
| IncompatibleSchema { .. } => StatusCode::InvalidArguments,
|
||||
| IncompatibleSchema { .. }
|
||||
| MysqlValueConversion { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
InfluxdbLinesWrite { source, .. }
|
||||
| PromSeriesWrite { source, .. }
|
||||
|
||||
@@ -24,7 +24,7 @@ use chrono::{NaiveDate, NaiveDateTime};
|
||||
use common_catalog::parse_catalog_and_schema_from_db_string;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{error, logging, tracing, warn};
|
||||
use common_telemetry::{debug, error, logging, tracing, warn};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use itertools::Itertools;
|
||||
use opensrv_mysql::{
|
||||
@@ -291,17 +291,16 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
let plan = match replace_params_with_values(&plan, param_types, ¶ms) {
|
||||
Ok(plan) => plan,
|
||||
Err(e) => {
|
||||
if e.status_code().should_log_error() {
|
||||
error!(e; "params: {}", params
|
||||
.iter()
|
||||
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
|
||||
.join(", "));
|
||||
}
|
||||
|
||||
w.error(
|
||||
ErrorKind::ER_TRUNCATED_WRONG_VALUE,
|
||||
format!(
|
||||
"err: {}, params: {}",
|
||||
e.output_msg(),
|
||||
params
|
||||
.iter()
|
||||
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
|
||||
.join(", ")
|
||||
)
|
||||
.as_bytes(),
|
||||
e.output_msg().as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
return Ok(());
|
||||
@@ -420,6 +419,15 @@ fn replace_params_with_values(
|
||||
) -> Result<LogicalPlan> {
|
||||
debug_assert_eq!(param_types.len(), params.len());
|
||||
|
||||
debug!(
|
||||
"replace_params_with_values(param_types: {:#?}, params: {:#?})",
|
||||
param_types,
|
||||
params
|
||||
.iter()
|
||||
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
|
||||
.join(", ")
|
||||
);
|
||||
|
||||
let mut values = Vec::with_capacity(params.len());
|
||||
|
||||
for (i, param) in params.iter().enumerate() {
|
||||
|
||||
@@ -15,12 +15,12 @@
|
||||
use std::ops::ControlFlow;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{NaiveDate, NaiveDateTime};
|
||||
use chrono::NaiveDate;
|
||||
use common_query::prelude::ScalarValue;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{self, Value};
|
||||
use itertools::Itertools;
|
||||
use opensrv_mysql::{ParamValue, ValueInner};
|
||||
use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner};
|
||||
use snafu::ResultExt;
|
||||
use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
|
||||
use sql::statements::statement::Statement;
|
||||
@@ -171,9 +171,29 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
|
||||
let date: common_time::Date = NaiveDate::from(param.value).into();
|
||||
Ok(ScalarValue::Date32(Some(date.val())))
|
||||
}
|
||||
ValueInner::Datetime(_) => Ok(ScalarValue::Date64(Some(
|
||||
NaiveDateTime::from(param.value).timestamp_millis(),
|
||||
))),
|
||||
ValueInner::Datetime(_) => {
|
||||
let timestamp_millis = to_naive_datetime(param.value)
|
||||
.map_err(|e| {
|
||||
error::MysqlValueConversionSnafu {
|
||||
err_msg: e.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.timestamp_millis();
|
||||
|
||||
match t {
|
||||
ConcreteDataType::DateTime(_) => Ok(ScalarValue::Date64(Some(timestamp_millis))),
|
||||
ConcreteDataType::Timestamp(_) => Ok(ScalarValue::TimestampMillisecond(
|
||||
Some(timestamp_millis),
|
||||
None,
|
||||
)),
|
||||
_ => error::PreparedStmtTypeMismatchSnafu {
|
||||
expected: t,
|
||||
actual: param.coltype,
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
ValueInner::Time(_) => Ok(ScalarValue::Time64Nanosecond(Some(
|
||||
Duration::from(param.value).as_millis() as i64,
|
||||
))),
|
||||
|
||||
@@ -60,6 +60,7 @@ macro_rules! sql_tests {
|
||||
test_postgres_auth,
|
||||
test_postgres_crud,
|
||||
test_postgres_parameter_inference,
|
||||
test_mysql_prepare_stmt_insert_timestamp,
|
||||
);
|
||||
)*
|
||||
};
|
||||
@@ -579,3 +580,78 @@ pub async fn test_mysql_async_timestamp(store_type: StorageType) {
|
||||
let _ = fe_mysql_server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) {
|
||||
let (addr, mut guard, server) =
|
||||
setup_mysql_server(store_type, "test_mysql_prepare_stmt_insert_timestamp").await;
|
||||
|
||||
let pool = MySqlPoolOptions::new()
|
||||
.max_connections(2)
|
||||
.connect(&format!("mysql://{addr}/public"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sqlx::query("create table demo(i bigint, ts timestamp time index)")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Valid timestamp binary encoding: https://mariadb.com/kb/en/resultset-row/#timestamp-binary-encoding
|
||||
|
||||
// Timestamp data length = 4, year-month-day(ymd) only:
|
||||
sqlx::query("insert into demo values(?, ?)")
|
||||
.bind(0)
|
||||
.bind(
|
||||
NaiveDate::from_ymd_opt(2023, 12, 19)
|
||||
// Though hour, minute and second are provided, `sqlx` will not encode them if they are all zeroes,
|
||||
// which is just what we desire here.
|
||||
// See https://github.com/launchbadge/sqlx/blob/bb064e3789d68ad4e9affe7cba34944abb000f72/sqlx-core/src/mysql/types/chrono.rs#L186C22-L186C22
|
||||
.and_then(|x| x.and_hms_opt(0, 0, 0))
|
||||
.unwrap(),
|
||||
)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Timestamp data length = 7, ymd and hour-minute-second(hms):
|
||||
sqlx::query("insert into demo values(?, ?)")
|
||||
.bind(1)
|
||||
.bind(
|
||||
NaiveDate::from_ymd_opt(2023, 12, 19)
|
||||
.and_then(|x| x.and_hms_opt(13, 19, 1))
|
||||
.unwrap(),
|
||||
)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Timestamp data length = 11, ymd, hms and microseconds:
|
||||
sqlx::query("insert into demo values(?, ?)")
|
||||
.bind(2)
|
||||
.bind(
|
||||
NaiveDate::from_ymd_opt(2023, 12, 19)
|
||||
.and_then(|x| x.and_hms_micro_opt(13, 20, 1, 123456))
|
||||
.unwrap(),
|
||||
)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = sqlx::query("select i, ts from demo order by i")
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rows.len(), 3);
|
||||
|
||||
let x: DateTime<Utc> = rows[0].get(1);
|
||||
assert_eq!(x.to_string(), "2023-12-19 00:00:00 UTC");
|
||||
|
||||
let x: DateTime<Utc> = rows[1].get(1);
|
||||
assert_eq!(x.to_string(), "2023-12-19 13:19:01 UTC");
|
||||
|
||||
let x: DateTime<Utc> = rows[2].get(1);
|
||||
assert_eq!(x.to_string(), "2023-12-19 13:20:01.123 UTC");
|
||||
|
||||
let _ = server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user