fix: correct mysql timestamp encoding for binary protocol (#2797)

* fix: correct mysql timestamp encoding for binary protocol

* test: add tests for mysql timestamp encoding
This commit is contained in:
Ning Sun
2023-11-23 12:54:59 -06:00
committed by GitHub
parent 102e43aace
commit 85eebcb16f
7 changed files with 238 additions and 67 deletions

112
Cargo.lock generated
View File

@@ -840,17 +840,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "bigdecimal"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6773ddc0eafc0e509fb60e48dff7f450f8e674a0686ae8605e8d9901bd5eefa"
dependencies = [
"num-bigint",
"num-integer",
"num-traits",
]
[[package]]
name = "bigdecimal"
version = "0.4.2"
@@ -1019,6 +1008,15 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "btoi"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd6407f73a9b8b6162d8a2ef999fe6afd7cc15902ebf42c5cd296addf17e0ad"
dependencies = [
"num-traits",
]
[[package]]
name = "build-data"
version = "0.1.5"
@@ -1643,7 +1641,7 @@ name = "common-decimal"
version = "0.4.3"
dependencies = [
"arrow",
"bigdecimal 0.4.2",
"bigdecimal",
"common-error",
"common-macro",
"rust_decimal",
@@ -4120,6 +4118,15 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "keyed_priority_queue"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d"
dependencies = [
"indexmap 2.1.0",
]
[[package]]
name = "lalrpop"
version = "0.19.12"
@@ -4166,15 +4173,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lexical"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7aefb36fd43fef7003334742cbf77b243fcd36418a1d1bdd480d613a67968f6"
dependencies = [
"lexical-core",
]
[[package]]
name = "lexical-core"
version = "0.8.5"
@@ -4424,11 +4422,11 @@ dependencies = [
[[package]]
name = "lru"
version = "0.10.1"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "718e8fae447df0c7e1ba7f5189829e63fd536945c8988d61444c19039f16b670"
checksum = "2994eeba8ed550fd9b47a0b38f0242bc3344e496483c6180b69139cc2fa5d1d7"
dependencies = [
"hashbrown 0.13.2",
"hashbrown 0.14.2",
]
[[package]]
@@ -4908,8 +4906,9 @@ dependencies = [
[[package]]
name = "mysql_async"
version = "0.32.1"
source = "git+https://github.com/blackbeam/mysql_async.git?rev=32c6f2a986789f97108502c2d0c755a089411b66#32c6f2a986789f97108502c2d0c755a089411b66"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6750b17ce50f8f112ef1a8394121090d47c596b56a6a17569ca680a9626e2ef2"
dependencies = [
"bytes",
"crossbeam",
@@ -4917,15 +4916,16 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"keyed_priority_queue",
"lazy_static",
"lru",
"mio",
"mysql_common",
"once_cell",
"pem 2.0.1",
"pem 3.0.2",
"percent-encoding",
"pin-project",
"priority-queue",
"rand",
"rustls 0.21.9",
"rustls-pemfile",
"serde",
@@ -4938,20 +4938,21 @@ dependencies = [
"twox-hash",
"url",
"webpki",
"webpki-roots 0.23.1",
"webpki-roots 0.25.3",
]
[[package]]
name = "mysql_common"
version = "0.30.6"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57349d5a326b437989b6ee4dc8f2f34b0cc131202748414712a8e7d98952fc8c"
checksum = "06f19e4cfa0ab5a76b627cec2d81331c49b034988eaf302c3bafeada684eadef"
dependencies = [
"base64 0.21.5",
"bigdecimal 0.3.1",
"bigdecimal",
"bindgen",
"bitflags 2.4.1",
"bitvec",
"btoi",
"byteorder",
"bytes",
"cc",
@@ -4961,7 +4962,6 @@ dependencies = [
"flate2",
"frunk",
"lazy_static",
"lexical",
"mysql-common-derive",
"num-bigint",
"num-traits",
@@ -4978,6 +4978,7 @@ dependencies = [
"thiserror",
"time",
"uuid",
"zstd 0.12.4",
]
[[package]]
@@ -5349,9 +5350,9 @@ dependencies = [
[[package]]
name = "opensrv-mysql"
version = "0.4.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c66063eb6aca9e6b5354f91db29f7244a8e7f9c01219b3ce76a5340a78d9f6f"
checksum = "208bfa36c4b4a8d6ac90eda62e34efa66f7e692df91bd3626bc47329844a86b1"
dependencies = [
"async-trait",
"byteorder",
@@ -5821,6 +5822,16 @@ dependencies = [
"serde",
]
[[package]]
name = "pem"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923"
dependencies = [
"base64 0.21.5",
"serde",
]
[[package]]
name = "pem-rfc7468"
version = "0.3.1"
@@ -6218,16 +6229,6 @@ dependencies = [
"syn 2.0.39",
]
[[package]]
name = "priority-queue"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff39edfcaec0d64e8d0da38564fad195d2d51b680940295fcc307366e101e61"
dependencies = [
"autocfg",
"indexmap 1.9.3",
]
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
@@ -7408,7 +7409,7 @@ checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
dependencies = [
"log",
"ring 0.17.5",
"rustls-webpki 0.101.7",
"rustls-webpki",
"sct",
]
@@ -7433,16 +7434,6 @@ dependencies = [
"base64 0.21.5",
]
[[package]]
name = "rustls-webpki"
version = "0.100.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
name = "rustls-webpki"
version = "0.101.7"
@@ -9211,6 +9202,7 @@ dependencies = [
"itertools 0.10.5",
"meta-client",
"meta-srv",
"mysql_async",
"num_cpus",
"object-store",
"once_cell",
@@ -9236,6 +9228,7 @@ dependencies = [
"substrait 0.4.3",
"table",
"tempfile",
"time",
"tokio",
"tokio-postgres",
"tonic 0.10.2",
@@ -10477,12 +10470,9 @@ dependencies = [
[[package]]
name = "webpki-roots"
version = "0.23.1"
version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
dependencies = [
"rustls-webpki 0.100.3",
]
checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10"
[[package]]
name = "which"

View File

@@ -15,10 +15,11 @@
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use chrono::{LocalResult, NaiveDateTime};
use chrono::{LocalResult, NaiveDateTime, TimeZone as ChronoTimeZone, Utc};
use serde::{Deserialize, Serialize};
use crate::error::{Error, InvalidDateStrSnafu, Result};
use crate::timezone::TimeZone;
use crate::util::{format_utc_datetime, local_datetime_to_utc};
use crate::Date;
@@ -108,6 +109,15 @@ impl DateTime {
NaiveDateTime::from_timestamp_millis(self.0)
}
pub fn to_chrono_datetime_with_timezone(&self, tz: Option<TimeZone>) -> Option<NaiveDateTime> {
let datetime = self.to_chrono_datetime();
datetime.map(|v| match tz {
Some(TimeZone::Offset(offset)) => offset.from_utc_datetime(&v).naive_local(),
Some(TimeZone::Named(tz)) => tz.from_utc_datetime(&v).naive_local(),
None => Utc.from_utc_datetime(&v).naive_local(),
})
}
/// Convert to [common_time::date].
pub fn to_date(&self) -> Option<Date> {
self.to_chrono_datetime().map(|d| Date::from(d.date()))

View File

@@ -21,7 +21,7 @@ use std::time::Duration;
use arrow::datatypes::TimeUnit as ArrowTimeUnit;
use chrono::{
DateTime, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone,
DateTime, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc,
};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -252,6 +252,15 @@ impl Timestamp {
NaiveDateTime::from_timestamp_opt(sec, nsec)
}
pub fn to_chrono_datetime_with_timezone(&self, tz: Option<TimeZone>) -> Option<NaiveDateTime> {
let datetime = self.to_chrono_datetime();
datetime.map(|v| match tz {
Some(TimeZone::Offset(offset)) => offset.from_utc_datetime(&v).naive_local(),
Some(TimeZone::Named(tz)) => tz.from_utc_datetime(&v).naive_local(),
None => Utc.from_utc_datetime(&v).naive_local(),
})
}
/// Convert timestamp to chrono date.
pub fn to_chrono_date(&self) -> Option<NaiveDate> {
self.to_chrono_datetime().map(|ndt| ndt.date())

View File

@@ -54,7 +54,7 @@ lazy_static.workspace = true
mime_guess = "2.0"
once_cell.workspace = true
openmetrics-parser = "0.4"
opensrv-mysql = "0.4"
opensrv-mysql = "0.5"
opentelemetry-proto.workspace = true
parking_lot = "0.12"
pgwire = "0.16"
@@ -103,7 +103,7 @@ catalog = { workspace = true, features = ["testing"] }
client.workspace = true
common-base.workspace = true
common-test-util.workspace = true
mysql_async = { git = "https://github.com/blackbeam/mysql_async.git", rev = "32c6f2a986789f97108502c2d0c755a089411b66", default-features = false, features = [
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
rand.workspace = true

View File

@@ -192,9 +192,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => row_writer.write_col(v.deref())?,
Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
Value::DateTime(v) => row_writer.write_col(v.to_chrono_datetime())?,
// convert datetime and timestamp to timezone of current connection
Value::DateTime(v) => row_writer
.write_col(v.to_chrono_datetime_with_timezone(query_context.time_zone()))?,
Value::Timestamp(v) => row_writer
.write_col(v.to_timezone_aware_string(query_context.time_zone()))?,
.write_col(v.to_chrono_datetime_with_timezone(query_context.time_zone()))?,
Value::Interval(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
Value::List(_) => {

View File

@@ -36,6 +36,9 @@ frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
object-store.workspace = true
once_cell.workspace = true
operator.workspace = true
@@ -59,6 +62,7 @@ sqlx = { version = "0.6", features = [
substrait.workspace = true
table.workspace = true
tempfile.workspace = true
time = "0.3"
tokio.workspace = true
tonic.workspace = true
tower = "0.4"

View File

@@ -56,6 +56,7 @@ macro_rules! sql_tests {
test_mysql_auth,
test_mysql_crud,
test_mysql_timezone,
test_mysql_async_timestamp,
test_postgres_auth,
test_postgres_crud,
test_postgres_parameter_inference,
@@ -423,3 +424,158 @@ pub async fn test_postgres_parameter_inference(store_type: StorageType) {
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_async_timestamp(store_type: StorageType) {
use mysql_async::prelude::*;
use time::PrimitiveDateTime;
#[derive(Debug)]
struct CpuMetric {
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
}
impl CpuMetric {
fn new(
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
) -> Self {
Self {
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
}
}
}
common_telemetry::init_default_ut_logging();
let (addr, mut guard, fe_mysql_server) = setup_mysql_server(store_type, "sql_timestamp").await;
let url = format!("mysql://{addr}/public");
let opts = mysql_async::Opts::from_url(&url).unwrap();
let mut conn = mysql_async::Conn::new(opts)
.await
.expect("create connection failure");
r"CREATE TABLE IF NOT EXISTS cpu_metrics (
hostname STRING,
environment STRING,
usage_user DOUBLE,
usage_system DOUBLE,
usage_idle DOUBLE,
ts TIMESTAMP,
TIME INDEX(ts),
PRIMARY KEY(hostname, environment)
);"
.ignore(&mut conn)
.await
.expect("create table failure");
let metrics = vec![
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307200050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307200050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307260050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307260050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307320050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307320050,
),
];
r"INSERT INTO cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
.with(metrics.iter().map(|metric| {
params! {
"hostname" => &metric.hostname,
"environment" => &metric.environment,
"usage_user" => metric.usage_user,
"usage_system" => metric.usage_system,
"usage_idle" => metric.usage_idle,
"ts" => metric.ts,
}
}))
.batch(&mut conn)
.await
.expect("insert data failure");
// query data
let loaded_metrics = "SELECT * FROM cpu_metrics"
.with(())
.map(
&mut conn,
|(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
String,
String,
f64,
f64,
f64,
PrimitiveDateTime,
)| {
let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
CpuMetric::new(
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
)
},
)
.await
.expect("query data failure");
assert_eq!(loaded_metrics.len(), 6);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}