Files
greptimedb/tests-integration/tests/sql.rs
Yingwen 0ef54511f7 chore: pick fixes and bump version to v1.0.2 (#8116)
* fix: window sort off by one precision TimeRange&better alias track (#8019)

* fix: window sort track alias&off by one precision TimeRange

Signed-off-by: discord9 <discord9@163.com>

* chore: more test

Signed-off-by: discord9 <discord9@163.com>

* refactor: clear helper

Signed-off-by: discord9 <discord9@163.com>

* dedup a bit

Signed-off-by: discord9 <discord9@163.com>

* feat: even more guard

Signed-off-by: discord9 <discord9@163.com>

* fix: case insensitive

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
(cherry picked from commit 9fafd879ed)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix(server): describe EXPLAIN statements so bind parameters work (#8035)

* fix(server): describe EXPLAIN statements so bind parameters work

`do_describe_inner` only planned `Insert`/`Query`/`Delete`, so
`EXPLAIN` and `EXPLAIN ANALYZE` fell through to the non-plan branch
and had no parameter-type inference. At Bind time the Postgres
handler then reported `unsupported_parameter_type` even though the
inner query would have worked on its own.

Recurse one level into `Statement::Explain` so that an EXPLAIN
wrapping a plannable statement goes through the same describe path.
Adds a tokio-postgres integration test that exercises
`EXPLAIN`/`EXPLAIN ANALYZE` over the extended query protocol.

Fixes #8029

Signed-off-by: BootstrapperSBL <yvanwww@gmail.com>

* refactor(server): extract plannable-inner check into closure

Reduce duplication between the direct match and the EXPLAIN inner match
by factoring out is_inner_plannable. Behaviour unchanged.

Signed-off-by: BootstrapperSBL <yvanwww@gmail.com>

---------

Signed-off-by: BootstrapperSBL <yvanwww@gmail.com>
(cherry picked from commit 793545d8e6)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: windows windowed sort ci (#8039)

* fix: windows windowed sort ci

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
(cherry picked from commit 760581b2a0)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: batched prometheus ingest row metric (#8054)

* fix: count batched prometheus ingest rows

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: align batched ingest metrics

Use actual affected rows when updating `DIST_INGEST_ROW_COUNT` and cache the flush database label to avoid repeated `get_db_string` allocation.

Files: `src/servers/src/pending_rows_batcher.rs`
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
(cherry picked from commit f0b3ee4830)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: preserve case in database name from connection string (#8062)

`parse_optional_catalog_and_schema_from_db_string` unconditionally
lowercased database/schema names, causing quoted database names (e.g.
`CREATE DATABASE "TestQuery"`) to be stored with preserved case but
looked up as lowercase on connection, resulting in "Database not found".

Fixes #8059

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
(cherry picked from commit f5c1d5d9bc)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix(metric-engine): validate column types and require time index in verify_rows (#8018)

* fix(metric-engine): validate column types and require time index in verify_rows

The remote-write path into the metric engine previously bypassed schema
validation. When a row's time index column carried a non-timestamp
datatype (e.g. a string), the request reached mito's ValueBuilder::push
for the timestamp builder and panicked instead of surfacing a typed
error.

Cache the (column_id, data_type, semantic_type) tuple for each physical
column on PhysicalRegionState and use it in verify_rows to:

- reject columns whose datatype or semantic type disagrees with the
  physical region's schema (mirrors mito's WriteRequest::check_schema)
- reject requests that omit the time index column entirely

Field columns stay optional; tag completeness needs per-logical-region
metadata that verify_rows doesn't have and is left to a follow-up.

Fixes #7990.

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* refactor(metric-engine): simplify PhysicalColumnInfo construction

- Add From<ColumnMetadata> and From<&ColumnMetadata> for PhysicalColumnInfo
  so call sites can use metadata.into() instead of repeating the field list.
- Replace the four struct-literal constructions in create.rs, open.rs and
  alter.rs with the conversion.
- In verify_rows, pass &col.column_name to ColumnNotFoundSnafu instead of
  cloning it explicitly (snafu's context handles the conversion).

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* perf(metric-engine): cache time index column name in PhysicalRegionState

verify_rows previously scanned every physical column on each row batch to
find the timestamp column. Since the time index is fixed at region
creation and never changes, stash its name on PhysicalRegionState when
the region is first registered and read it directly from there.

add_physical_columns carries a debug_assert to document the invariant
that alter never introduces a new time index.

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* perf(metric-engine): borrow physical column names when building name_to_id

On the row-write path we built a HashMap<String, ColumnId> by cloning
every column name out of the physical region's cached state. The map is
scoped to the block that holds the state's read guard, so there's no
need to own the keys.

Switch the map to HashMap<&str, ColumnId> and widen RowsIter::new /
IterIndex::new to accept any key type that borrows as str. Existing
test helpers that pass HashMap<String, ColumnId> keep working through
the Borrow<str> bound.

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* fix: validate metric rows against physical schema

Cache physical column metadata in the metric engine state so row validation and row modification can use the same source of truth for column IDs, data types, and semantic types.

Validate incoming metric rows against the physical schema before writes. Put requests now require the time index and the expected field column, while delete requests keep accepting primary-key-plus-timestamp payloads by skipping the field completeness check.

Pass physical column metadata directly into RowsIter instead of rebuilding a name-to-column-id map at each call site, and cover the new validation paths with tests for missing time indexes, missing fields, and duplicate field columns.

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: do not allow adding a new field

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fill default value for fields

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fill default for nullable fields

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: BootstrapperSBL <yvanwww01@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
(cherry picked from commit d1873ca31d)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: type inference for sql rewrite (#8052)

fix: type inference for rewrited sql
(cherry picked from commit 5b47ec24ec)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: infer time index from column meta on derived table (#8013)

* rough fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reorganize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* enhance default by infer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* supply comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
(cherry picked from commit 0d90f7407c)
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: pre-cast constants (#7926)

* init impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle no cast

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor using common-expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* extend matching pattern

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix zero timestamp

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: normalize sqlness partition count output

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: normalize remaining sqlness plan output

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: normalize sqlness repartition details in tql explain

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: tighten const normalization casts

* test: normalize standalone tql explain repartition output

* resolve cr comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
(cherry picked from commit 9133d0464f)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix(mito): ignore compaction override in enum option validation (#8094)

* fix(mito): ignore compaction override in enum option validation

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* test: cover compaction override without compaction type

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix(mito): short-circuit enum option validation

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

---------

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
(cherry picked from commit 73c267e641)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix(mito2): drop unsound time-filter cache-key stripping (#8105)

* fix(mito2): drop unsound time-filter cache-key stripping

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments and test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
(cherry picked from commit 5e468190a5)
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: remap batch table route addresses (#8109)

(cherry picked from commit a04fa52486)
Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: bump version to v1.0.2

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: avoid stale route update during repartition allocation (#8115)

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update sqlness result

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: BootstrapperSBL <yvanwww@gmail.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: Yvan Wang <131545713+BootstrapperSBL@users.noreply.github.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: BootstrapperSBL <yvanwww01@gmail.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: QuakeWang <45645138+QuakeWang@users.noreply.github.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2026-05-14 20:18:55 +08:00

1747 lines
53 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use auth::user_provider_from_option;
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
};
use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions};
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
use sqlx::types::Decimal;
use sqlx::{Connection, Executor, Row};
use tests_integration::test_util::{
StorageType, setup_mysql_server, setup_mysql_server_with_slow_query_threshold,
setup_mysql_server_with_user_provider, setup_pg_server,
setup_pg_server_with_slow_query_threshold, setup_pg_server_with_user_provider,
};
use tokio_postgres::{Client, NoTls, SimpleQueryMessage};
#[macro_export]
macro_rules! sql_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<integration_sql_ $service:lower _test>] {
$(
#[tokio::test(flavor = "multi_thread")]
$(
#[$meta]
)*
async fn [< $test >]() {
common_telemetry::init_default_ut_logging();
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::info!("test {} starts, store_type: {:?}", stringify!($test), store_type);
let _ = $crate::sql::$test(store_type).await;
}
}
)*
}
}
};
}
#[macro_export]
macro_rules! sql_tests {
($($service:ident),*) => {
$(
sql_test!(
$service,
test_mysql_auth,
test_mysql_stmts,
test_mysql_crud,
test_mysql_timezone,
test_mysql_async_timestamp,
test_mysql_slow_query,
test_postgres_auth,
test_postgres_crud,
test_postgres_timezone,
test_postgres_bytea,
test_postgres_slow_query,
test_postgres_datestyle,
test_postgres_intervalstyle,
test_postgres_parameter_inference,
test_postgres_uint64_parameter,
test_postgres_explain_bind_parameter,
test_postgres_array_types,
test_mysql_prepare_stmt_insert_timestamp,
test_declare_fetch_close_cursor,
test_alter_update_on,
);
)*
};
}
pub async fn test_mysql_auth(store_type: StorageType) {
let user_provider = user_provider_from_option(
"static_user_provider:cmd:greptime_user=greptime_pwd,readonly_user:ro=readonly_pwd,writeonly_user:wo=writeonly_pwd",
)
.unwrap();
let (mut guard, fe_mysql_server) =
setup_mysql_server_with_user_provider(store_type, "sql_crud", Some(user_provider)).await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
// 1. no auth
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("28000")
);
// 2. wrong pwd
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://greptime_user:wrong_pwd@{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("28000")
);
// 3. right pwd
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://greptime_user:greptime_pwd@{addr}/public"))
.await;
assert!(conn_re.is_ok());
// 4. readonly user
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://readonly_user:readonly_pwd@{addr}/public"))
.await;
assert!(conn_re.is_ok());
let pool = conn_re.unwrap();
let _ = pool.execute("SELECT 1").await.unwrap();
let err = pool
.execute("CREATE TABLE test (ts timestamp time index)")
.await
.unwrap_err();
assert!(
err.to_string()
.contains("(PermissionDenied): User is not authorized to perform this action"),
"{}",
err.to_string()
);
// 5. writeonly user
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!(
"mysql://writeonly_user:writeonly_pwd@{addr}/public"
))
.await;
assert!(conn_re.is_ok());
let pool = conn_re.unwrap();
let _ = pool
.execute("CREATE TABLE test (ts timestamp time index)")
.await
.unwrap();
let err = pool.execute("SHOW TABLES").await.unwrap_err();
assert!(
err.to_string()
.contains("(PermissionDenied): User is not authorized to perform this action"),
"{}",
err.to_string()
);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_stmts(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_stmts").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
conn.execute("SET SESSION TRANSACTION READ ONLY")
.await
.unwrap();
conn.execute("SET TRANSACTION READ ONLY").await.unwrap();
// empty statements
// Only when "--" is followed by a whitespace is it considered a valid comment in MySQL,
// see https://dev.mysql.com/doc/refman/8.4/en/ansi-diff-comments.html
let err = conn.execute(" -- ----- ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute("-- --------\n;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" \n ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_crud(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_crud").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt timestamp(3) default null, b blob default null, j json, v vector(3) default null)",
)
.execute(&pool)
.await
.unwrap();
for i in 0..10 {
let dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
chrono::DateTime::from_timestamp(60, i).unwrap().naive_utc(),
Utc,
);
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
let json = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
let vector = "[1,2,3]";
sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?, ?)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.bind(json)
.bind(vector)
.execute(&pool)
.await
.unwrap();
}
let rows = sqlx::query("select i, d, dt, b, j, v from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 10);
for (i, row) in rows.iter().enumerate() {
let ret: i64 = row.get("i");
let d: NaiveDate = row.get("d");
let dt: DateTime<Utc> = row.get("dt");
let bytes: Vec<u8> = row.get("b");
let json: serde_json::Value = row.get("j");
let vector: Vec<u8> = row.get("v");
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
let expected_dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
chrono::DateTime::from_timestamp(60, i as u32)
.unwrap()
.naive_utc(),
Utc,
);
assert_eq!(
format!("{}", expected_dt.format("%Y-%m-%d %H:%M:%S")),
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
);
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
let expected_j = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
assert_eq!(json, expected_j);
assert_eq!(vector, "[1,2,3]".as_bytes());
}
let rows = sqlx::query("select i from demo where i=?")
.bind(6)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 1);
for row in rows {
let ret: i64 = row.get("i");
assert_eq!(ret, 6);
}
// parameter type mismatch
let query_re = sqlx::query("select i from demo where i = ?")
.bind("test")
.fetch_all(&pool)
.await;
assert!(query_re.is_err());
let err = query_re.unwrap_err();
common_telemetry::info!("Error is {}", err);
assert_eq!(
err.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.number(),
1210,
);
let _ = sqlx::query("delete from demo")
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select i from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 0);
// test prepare with default columns
sqlx::query("insert into demo(i) values(?)")
.bind(99)
.execute(&pool)
.await
.unwrap();
sqlx::query("insert into demo(i) values(?)")
.bind(-99)
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select * from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 2);
for row in rows {
let i: i64 = row.get("i");
let ts: DateTime<Utc> = row.get("ts");
let now = common_time::util::current_time_millis();
assert!(now - ts.timestamp_millis() < 1000);
assert_eq!(i.abs(), 99);
}
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_timezone(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_timezone").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
let _ = conn
.execute("SET time_zone = 'Asia/Shanghai'")
.await
.unwrap();
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "Asia/Shanghai");
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "Asia/Shanghai");
let timezone = conn.fetch_all("SELECT @@system_time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
let _ = conn.execute("SET time_zone = 'UTC'").await.unwrap();
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
let timezone = conn.fetch_all("SELECT @@system_time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
// test data
let _ = conn
.execute("create table demo(i bigint, ts timestamp time index)")
.await
.unwrap();
let _ = conn
.execute("insert into demo values(1, 1667446797450)")
.await
.unwrap();
let rows = conn.fetch_all("select ts from demo").await.unwrap();
assert_eq!(
rows[0]
.get::<chrono::DateTime<Utc>, usize>(0)
.to_rfc3339_opts(SecondsFormat::Millis, true),
"2022-11-03T03:39:57.450Z"
);
let _ = conn.execute("SET time_zone = '+08:00'").await.unwrap();
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "+08:00");
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "+08:00");
let rows2 = conn.fetch_all("select ts from demo").await.unwrap();
// we use Utc here for format only
assert_eq!(
rows2[0]
.get::<chrono::DateTime<Utc>, usize>(0)
.to_rfc3339_opts(SecondsFormat::Millis, true),
"2022-11-03T11:39:57.450Z"
);
let _ = conn
.execute("SET @@SESSION.TIME_ZONE = '-7:00'")
.await
.unwrap();
let rows2 = conn.fetch_all("select ts from demo").await.unwrap();
// we use Utc here for format only
assert_eq!(
rows2[0]
.get::<chrono::DateTime<Utc>, usize>(0)
.to_rfc3339_opts(SecondsFormat::Millis, true),
"2022-11-02T20:39:57.450Z"
);
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "-07:00");
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "-07:00");
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_auth(store_type: StorageType) {
let user_provider =
user_provider_from_option("static_user_provider:cmd:greptime_user=greptime_pwd").unwrap();
let (mut guard, fe_pg_server) =
setup_pg_server_with_user_provider(store_type, "sql_crud", Some(user_provider)).await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
// 1. no auth
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<PgDatabaseError>()
.code(),
"28P01"
);
// 2. wrong pwd
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://greptime_user:wrong_pwd@{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<PgDatabaseError>()
.code(),
"28P01"
);
// 2. right pwd
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!(
"postgres://greptime_user:greptime_pwd@{addr}/public"
))
.await;
assert!(conn_re.is_ok());
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_alter_update_on(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_crud").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)",
)
.execute(&pool)
.await
.unwrap();
let row_before_alter = sqlx::query(
"SELECT *
FROM information_schema.tables WHERE table_name = $1;",
)
.bind("demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(row_before_alter.len(), 1);
let before_row = &row_before_alter[0];
let created_on: NaiveDateTime = before_row.get("create_time");
let updated_on_before: NaiveDateTime = before_row.get("update_time");
assert_eq!(created_on, updated_on_before);
std::thread::sleep(std::time::Duration::from_millis(1100));
sqlx::query("alter table demo add column j json;")
.execute(&pool)
.await
.unwrap();
let row_after_alter = sqlx::query(
"SELECT *
FROM information_schema.tables WHERE table_name = $1;",
)
.bind("demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(row_after_alter.len(), 1);
let after_row = &row_after_alter[0];
let updated_on_after: NaiveDateTime = after_row.get("update_time");
assert_ne!(updated_on_before, updated_on_after);
let _ = sqlx::query("delete from demo")
.execute(&pool)
.await
.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_crud(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_crud").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob, j json)",
)
.execute(&pool)
.await
.unwrap();
for i in 0..10 {
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_micros();
let bytes = "hello".as_bytes();
let json = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
sqlx::query("insert into demo values($1, $2, $3, $4, $5, $6)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.bind(json)
.execute(&pool)
.await
.unwrap();
}
let rows = sqlx::query("select i,d,dt,b,j from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 10);
for (i, row) in rows.iter().enumerate() {
let ret: i64 = row.get("i");
let d: NaiveDate = row.get("d");
let dt: NaiveDateTime = row.get("dt");
let bytes: Vec<u8> = row.get("b");
let json: serde_json::Value = row.get("j");
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
let expected_dt = NaiveDate::from_yo_opt(2015, 100)
.and_then(|d| d.and_hms_opt(0, 0, 0))
.unwrap();
assert_eq!(expected_dt, dt);
assert_eq!("hello".as_bytes(), bytes);
let expected_j = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
assert_eq!(json.to_string(), expected_j.to_string());
}
let rows = sqlx::query("select i from demo where i=$1")
.bind(6)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 1);
for row in rows {
let ret: i64 = row.get("i");
assert_eq!(ret, 6);
}
let _ = sqlx::query("delete from demo")
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select i from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 0);
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_slow_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server_with_slow_query_threshold(
store_type,
"test_mysql_slow_query",
Duration::from_millis(100),
)
.await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
// The slow query should run longer than the configured threshold.
let slow_query = "SELECT count(*) FROM generate_series(1, 50000000)";
// Simulate a slow query.
sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table} WHERE {} = ?",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
);
let row = tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Ok(Some(row)) = sqlx::query(&query)
.bind(slow_query)
.fetch_optional(&pool)
.await
{
break row;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.unwrap();
// Check the results.
let cost: u64 = row.get(0);
let threshold: u64 = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0 && threshold > 0 && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_bytea(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let _ = client
.simple_query("CREATE TABLE test(b BLOB, ts TIMESTAMP TIME INDEX)")
.await
.unwrap();
let _ = client
.simple_query("INSERT INTO test VALUES(X'6162636b6c6d2aa954', 0)")
.await
.unwrap();
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!(),
}
};
let r = client.simple_query("SELECT b FROM test").await.unwrap();
let b = get_row(r);
assert_eq!(b, "\\x6162636b6c6d2aa954");
let _ = client.simple_query("SET bytea_output='hex'").await.unwrap();
let r = client.simple_query("SELECT b FROM test").await.unwrap();
let b = get_row(r);
assert_eq!(b, "\\x6162636b6c6d2aa954");
let _ = client
.simple_query("SET bytea_output='escape'")
.await
.unwrap();
let r = client.simple_query("SELECT b FROM test").await.unwrap();
let b = get_row(r);
assert_eq!(b, "abcklm*\\251T");
let _e = client
.simple_query("SET bytea_output='invalid'")
.await
.unwrap_err();
// binary format shall not be affected by bytea_output
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
let row = sqlx::query("select b from test")
.fetch_one(&pool)
.await
.unwrap();
let val: Vec<u8> = row.get("b");
assert_eq!(val, [97, 98, 99, 107, 108, 109, 42, 169, 84]);
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_slow_query(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server_with_slow_query_threshold(
store_type,
"test_postgres_slow_query",
Duration::from_millis(100),
)
.await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
let slow_query = "SELECT count(*) FROM generate_series(1, 50000000)";
let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table} WHERE {} = $1",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
);
let row = tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Ok(Some(row)) = sqlx::query(&query)
.bind(slow_query)
.fetch_optional(&pool)
.await
{
break row;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.unwrap();
let cost: Decimal = row.get(0);
let threshold: Decimal = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0.into() && threshold > 0.into() && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_datestyle(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let validate_datestyle = |client: Client, datestyle: &str, is_valid: bool| {
let datestyle = datestyle.to_string();
async move {
assert_eq!(
client
.simple_query(format!("SET DATESTYLE={}", datestyle).as_str())
.await
.is_ok(),
is_valid
);
client
}
};
// style followed by order is valid
let client = validate_datestyle(client, "'ISO,MDY'", true).await;
// Mix of string and ident is valid
let client = validate_datestyle(client, "'ISO',MDY", true).await;
// list of string that didn't corrupt is valid
let client = validate_datestyle(client, "'ISO,MDY','ISO,MDY'", true).await;
// corrupted style
let client = validate_datestyle(client, "'ISO,German'", false).await;
// corrupted order
let client = validate_datestyle(client, "'ISO,DMY','ISO,MDY'", false).await;
// as long as the value is not corrupted, it's valid
let client = validate_datestyle(client, "ISO,ISO,ISO,ISO,ISO,MDY,MDY,MDY,MDY", true).await;
let _ = client
.simple_query("CREATE TABLE ts_test(ts TIMESTAMP TIME INDEX)")
.await
.expect("CREATE TABLE ts_test ERROR");
let _ = client
.simple_query("CREATE TABLE date_test(d date, ts TIMESTAMP TIME INDEX)")
.await
.expect("CREATE TABLE date_test ERROR");
let _ = client
.simple_query("CREATE TABLE dt_test(dt datetime, ts TIMESTAMP TIME INDEX)")
.await
.expect("CREATE TABLE dt_test ERROR");
let _ = client
.simple_query("INSERT INTO ts_test VALUES('1997-12-17 07:37:16.123')")
.await
.expect("INSERT INTO ts_test ERROR");
let _ = client
.simple_query("INSERT INTO date_test VALUES('1997-12-17', '1997-12-17 07:37:16.123')")
.await
.expect("INSERT INTO date_test ERROR");
let _ = client
.simple_query(
"INSERT INTO dt_test VALUES('1997-12-17 07:37:16.123', '1997-12-17 07:37:16.123')",
)
.await
.expect("INSERT INTO dt_test ERROR");
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!("Unexpected messages: {:?}", mess),
}
};
let date = "DATE";
let datetime = "TIMESTAMP";
let timestamp = "TIMESTAMP";
let iso = "ISO";
let sql = "SQL";
let postgres = "Postgres";
let german = "German";
let expected_set: HashMap<&str, HashMap<&str, HashMap<&str, &str>>> = HashMap::from([
(
date,
HashMap::from([
(
iso,
HashMap::from([
("MDY", "1997-12-17"),
("DMY", "1997-12-17"),
("YMD", "1997-12-17"),
]),
),
(
sql,
HashMap::from([
("MDY", "12/17/1997"),
("DMY", "17/12/1997"),
("YMD", "12/17/1997"),
]),
),
(
postgres,
HashMap::from([
("MDY", "12-17-1997"),
("DMY", "17-12-1997"),
("YMD", "12-17-1997"),
]),
),
(
german,
HashMap::from([
("MDY", "17.12.1997"),
("DMY", "17.12.1997"),
("YMD", "17.12.1997"),
]),
),
]),
),
(
timestamp,
HashMap::from([
(
iso,
HashMap::from([
("MDY", "1997-12-17 07:37:16.123000"),
("DMY", "1997-12-17 07:37:16.123000"),
("YMD", "1997-12-17 07:37:16.123000"),
]),
),
(
sql,
HashMap::from([
("MDY", "12/17/1997 07:37:16.123000"),
("DMY", "17/12/1997 07:37:16.123000"),
("YMD", "12/17/1997 07:37:16.123000"),
]),
),
(
postgres,
HashMap::from([
("MDY", "Wed Dec 17 07:37:16.123000 1997"),
("DMY", "Wed 17 Dec 07:37:16.123000 1997"),
("YMD", "Wed Dec 17 07:37:16.123000 1997"),
]),
),
(
german,
HashMap::from([
("MDY", "17.12.1997 07:37:16.123000"),
("DMY", "17.12.1997 07:37:16.123000"),
("YMD", "17.12.1997 07:37:16.123000"),
]),
),
]),
),
]);
let get_expected = |ty: &str, style: &str, order: &str| {
expected_set
.get(ty)
.and_then(|m| m.get(style))
.and_then(|m2| m2.get(order))
.unwrap()
.to_string()
};
for style in ["ISO", "SQL", "Postgres", "German"] {
for order in ["MDY", "DMY", "YMD"] {
let _ = client
.simple_query(&format!("SET DATESTYLE='{}', '{}'", style, order))
.await
.expect("SET DATESTYLE ERROR");
let r = client.simple_query("SELECT ts FROM ts_test").await.unwrap();
let ts = get_row(r);
assert_eq!(
ts,
get_expected(timestamp, style, order),
"style: {}, order: {}",
style,
order
);
let r = client
.simple_query("SELECT d FROM date_test")
.await
.unwrap();
let d = get_row(r);
assert_eq!(
d,
get_expected(date, style, order),
"style: {}, order: {}",
style,
order
);
let r = client.simple_query("SELECT dt FROM dt_test").await.unwrap();
let dt = get_row(r);
assert_eq!(
dt,
get_expected(datetime, style, order),
"style: {}, order: {}",
style,
order
);
}
}
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_intervalstyle(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_intervalstyle").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let validate_intervalstyle = |client: Client, intervalstyle: &str, is_valid: bool| {
let intervalstyle = intervalstyle.to_string();
async move {
assert_eq!(
client
.simple_query(format!("SET INTERVALSTYLE='{}'", intervalstyle).as_str())
.await
.is_ok(),
is_valid,
"testing intervalstyle {intervalstyle}"
);
client
}
};
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!(),
}
};
let client = validate_intervalstyle(client, "iso_8601", true).await;
let client = validate_intervalstyle(client, "sql_standard", true).await;
let client = validate_intervalstyle(client, "postgres", true).await;
let client = validate_intervalstyle(client, "postgres_verbose", true).await;
let client = validate_intervalstyle(client, "invalid_style", false).await;
let expected_formats: HashMap<&str, &str> = HashMap::from([
("iso_8601", "P1DT2H3M"),
("sql_standard", "1 2:03:00"),
("postgres", "1 day 02:03:00"),
("postgres_verbose", "@ 1 day 2 hours 3 mins"),
]);
for (style, expected_format) in expected_formats {
let _ = client
.simple_query(&format!("SET INTERVALSTYLE='{}'", style))
.await
.expect("SET INTERVALSTYLE ERROR");
let interval = get_row(
client
.simple_query("SHOW VARIABLES intervalstyle")
.await
.unwrap(),
);
assert_eq!(interval, style);
let result = get_row(
client
.simple_query("SELECT INTERVAL '1 day 2 hours 3 minutes'")
.await
.unwrap(),
);
assert_eq!(
result, expected_format,
"intervalstyle {}: expected '{}', got '{}'",
style, expected_format, result
);
}
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_timezone(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_timezone").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!(),
}
};
let _ = client.simple_query("SET time_zone = 'UTC'").await.unwrap();
let timezone = get_row(
client
.simple_query("SHOW VARIABLES time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "UTC");
let timezone = get_row(
client
.simple_query("SHOW VARIABLES system_time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "UTC");
let _ = client
.simple_query("SET time_zone = 'Asia/Shanghai'")
.await
.unwrap();
let timezone = get_row(
client
.simple_query("SHOW VARIABLES time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "Asia/Shanghai");
let timezone = get_row(
client
.simple_query("SHOW VARIABLES system_time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "UTC");
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_parameter_inference(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_parameter_inference").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
// Create demo table
let _ = client
.simple_query("create table demo(i bigint, ts timestamp time index, d date, dt datetime)")
.await
.unwrap();
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let dt = d.and_hms_opt(0, 0, 0).unwrap();
let _ = client
.execute(
"INSERT INTO demo VALUES($1, $2, $3, $4)",
&[&0i64, &dt, &d, &dt],
)
.await
.unwrap();
let rows = client
.query("SELECT * FROM demo WHERE i = $1", &[&0i64])
.await
.unwrap();
assert_eq!(1, rows.len());
// Shutdown the client.
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_uint64_parameter(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_uint64_parameter").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let _ = client
.simple_query("create table demo_u64(v bigint unsigned, ts timestamp time index)")
.await
.unwrap();
let dt = NaiveDate::from_yo_opt(2015, 100)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
let _ = client
.execute(
"INSERT INTO demo_u64 VALUES($1, $2)",
&[&Decimal::from(123456u64), &dt],
)
.await
.unwrap();
let rows = client
.query(
"SELECT count(*) FROM demo_u64 WHERE v = $1",
&[&Decimal::from(123456u64)],
)
.await
.unwrap();
assert_eq!(1, rows.len());
let count: i64 = rows[0].get(0);
assert_eq!(count, 1);
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_explain_bind_parameter(store_type: StorageType) {
// Regression test for #8029: EXPLAIN / EXPLAIN ANALYZE must accept bind
// parameters over the Postgres extended query protocol.
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_explain_bind_parameter").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let _ = client
.simple_query(
"create table t (k varchar(36) not null, ts timestamp(3) not null, time index(ts))",
)
.await
.unwrap();
let _ = client
.simple_query("insert into t (k, ts) values ('a', 1), ('b', 2), ('c', 3)")
.await
.unwrap();
// Sanity check: the underlying SELECT with a bind parameter works.
let rows = client
.query("SELECT k FROM t WHERE k = $1", &[&"a"])
.await
.unwrap();
assert_eq!(1, rows.len());
// EXPLAIN with a bind parameter must succeed.
let rows = client
.query("EXPLAIN SELECT k FROM t WHERE k = $1", &[&"a"])
.await
.unwrap();
assert!(!rows.is_empty(), "EXPLAIN should produce at least one row");
// EXPLAIN ANALYZE with a bind parameter must also succeed.
let rows = client
.query("EXPLAIN ANALYZE SELECT k FROM t WHERE k = $1", &[&"a"])
.await
.unwrap();
assert!(
!rows.is_empty(),
"EXPLAIN ANALYZE should produce at least one row"
);
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_async_timestamp(store_type: StorageType) {
use mysql_async::prelude::*;
use time::PrimitiveDateTime;
#[derive(Debug)]
struct CpuMetric {
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
}
impl CpuMetric {
fn new(
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
) -> Self {
Self {
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
}
}
}
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) =
setup_mysql_server(store_type, "test_mysql_async_timestamp").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let url = format!("mysql://{addr}/public");
let opts = mysql_async::Opts::from_url(&url).unwrap();
let mut conn = mysql_async::Conn::new(opts)
.await
.expect("create connection failure");
r"CREATE TABLE IF NOT EXISTS cpu_metrics (
hostname STRING,
environment STRING,
usage_user DOUBLE,
usage_system DOUBLE,
usage_idle DOUBLE,
ts TIMESTAMP,
TIME INDEX(ts),
PRIMARY KEY(hostname, environment)
);"
.ignore(&mut conn)
.await
.expect("create table failure");
let metrics = [
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307200050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307200050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307260050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307260050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307320050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307320050,
),
];
r"INSERT INTO cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
.with(metrics.iter().map(|metric| {
params! {
"hostname" => &metric.hostname,
"environment" => &metric.environment,
"usage_user" => metric.usage_user,
"usage_system" => metric.usage_system,
"usage_idle" => metric.usage_idle,
"ts" => metric.ts,
}
}))
.batch(&mut conn)
.await
.expect("insert data failure");
// query data
let loaded_metrics = "SELECT * FROM cpu_metrics"
.with(())
.map(
&mut conn,
|(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
String,
String,
f64,
f64,
f64,
PrimitiveDateTime,
)| {
let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
CpuMetric::new(
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
)
},
)
.await
.expect("query data failure");
assert_eq!(loaded_metrics.len(), 6);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) {
let (mut guard, server) =
setup_mysql_server(store_type, "test_mysql_prepare_stmt_insert_timestamp").await;
let addr = server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
sqlx::query("create table demo(i bigint, ts timestamp time index)")
.execute(&pool)
.await
.unwrap();
// Valid timestamp binary encoding: https://mariadb.com/kb/en/resultset-row/#timestamp-binary-encoding
// Timestamp data length = 4, year-month-day(ymd) only:
sqlx::query("insert into demo values(?, ?)")
.bind(0)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
// Though hour, minute and second are provided, `sqlx` will not encode them if they are all zeroes,
// which is just what we desire here.
// See https://github.com/launchbadge/sqlx/blob/bb064e3789d68ad4e9affe7cba34944abb000f72/sqlx-core/src/mysql/types/chrono.rs#L186C22-L186C22
.and_then(|x| x.and_hms_opt(0, 0, 0))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();
// Timestamp data length = 7, ymd and hour-minute-second(hms):
sqlx::query("insert into demo values(?, ?)")
.bind(1)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
.and_then(|x| x.and_hms_opt(13, 19, 1))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();
// Timestamp data length = 11, ymd, hms and microseconds:
sqlx::query("insert into demo values(?, ?)")
.bind(2)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
.and_then(|x| x.and_hms_micro_opt(13, 20, 1, 123456))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select i, ts from demo order by i")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 3);
let x: DateTime<Utc> = rows[0].get("ts");
assert_eq!(x.to_string(), "2023-12-19 00:00:00 UTC");
let x: DateTime<Utc> = rows[1].get("ts");
assert_eq!(x.to_string(), "2023-12-19 13:19:01 UTC");
let x: DateTime<Utc> = rows[2].get("ts");
assert_eq!(x.to_string(), "2023-12-19 13:20:01.123 UTC");
let _ = server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_array_types(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_array_types").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let rows = client
.query(
"SELECT arrow_cast(1, 'List(Int8)'), arrow_cast('tom', 'List(Utf8)'), arrow_cast(3.14, 'List(Float32)'), arrow_cast('2023-01-02T12:53:02', 'List(Timestamp(Millisecond, None))')",
&[],
)
.await
.unwrap();
assert_eq!(1, rows.len());
// Shutdown the client.
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_declare_fetch_close_cursor(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_declare_fetch_close_cursor").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
client
.execute(
"DECLARE c1 CURSOR FOR SELECT * FROM numbers WHERE number > 2 LIMIT 50::bigint",
&[],
)
.await
.expect("declare cursor");
// duplicated cursor
assert!(
client
.execute("DECLARE c1 CURSOR FOR SELECT 1", &[],)
.await
.is_err()
);
let rows = client.query("FETCH 5 FROM c1", &[]).await.unwrap();
assert_eq!(5, rows.len());
let rows = client.query("FETCH 100 FROM c1", &[]).await.unwrap();
assert_eq!(45, rows.len());
let rows = client.query("FETCH 100 FROM c1", &[]).await.unwrap();
assert_eq!(0, rows.len());
client.execute("CLOSE c1", &[]).await.expect("close cursor");
// cursor not found
let result = client.query("FETCH 100 FROM c1", &[]).await;
assert!(result.is_err());
client
.execute(
"DECLARE c2 CURSOR FOR SELECT * FROM numbers WHERE number < 0",
&[],
)
.await
.expect("declare cursor");
let rows = client.query("FETCH 5 FROM c2", &[]).await.unwrap();
assert_eq!(0, rows.len());
client.execute("CLOSE c2", &[]).await.expect("close cursor");
// Shutdown the client.
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}