Files
greptimedb/tests-integration/tests/sql.rs
Weny Xu 8d2f92c01a chore: cherry pick fixes and bum version to v1.0.1 (#8024)
* fix: remap peer addresses during retries (#7933)

* fix: remap peer addresses during retries

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: styling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: using uint64 datatype for postgres prepared statement parameters (#7942)

* feat: add support for decimal parameter type, remove string replacement fallback

* chore: format

* fix: add support for using unsigned bigint in postgres

* chore: format toml

* refactor: cleanup duplicated code

* fix: rescale decimal

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix current version comparison logic for pre-releases (#7946)

Signed-off-by: liyang <daviderli614@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(index): intersect bitmaps before early exit in predicates applier (#7867)

* fix(index): intersect bitmaps before early exit in predicates applier

The loop skipped intersecting when the next bitmap was empty, which left
the accumulator unchanged instead of zeroing it. Intersect first, then
break when the result is empty.

Signed-off-by: Weixie Cui <cuiweixie@gmail.com>

* per gemini

* style(index): format predicates applier loop

* fix(index): remove unused mut in predicates applier

---------

Signed-off-by: Weixie Cui <cuiweixie@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: randomize standalone test ports in cli export test (#7955)

fix/flaky-test:
 ### Add Dynamic Port Selection for Standalone Tests

 - **`cli.rs`**: Implemented functions `random_standalone_addrs` and `choose_random_unused_port_offset` to dynamically select unused ports for standalone tests, enhancing test reliability.
 - Updated `test_export_create_table_with_quoted_names` to use dynamically assigned ports for HTTP, RPC, MySQL, and PostgreSQL addresses.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix git cliff errors in latest version (#7947)

* chore: fix git cliff errors in latest version

- Fix errors in v2.12.0
- Do not generate logs for beta/rc tags between the compared commits

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: preserve blank line before release date in changelog

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: match term zh (#7952)

* fix: match term zh

Signed-off-by: discord9 <discord9@163.com>

* chore: per gemini

Signed-off-by: discord9 <discord9@163.com>

* chore: revert accident change

Signed-off-by: discord9 <discord9@163.com>

* feat: unicode script han

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* ci: set upload timeout for uploading artifacts to S3 (#7958)

* ci: set upload timeout for uploading artifacts to S3

Signed-off-by: liyang <daviderli614@gmail.com>

* Update upload-artifacts-to-s3.sh

---------

Signed-off-by: liyang <daviderli614@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: cargo check -p common-meta (#7964)

fix: moka feature
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: always skip field pruning when using merge mode (#7957)

* test: add prefilter regressions for last_row null filters

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip fields in all merge mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify pre-filter skip fields handling

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: mysql prepare correctly returns error instead of panic (#7963)

feat: mysql writer support multiple statement execution

Signed-off-by: luofucong <luofc@foxmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: relax azblob validation requirements (#7970)

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(mito2): allow CompactionOutput to succeed independently (#7948)

* refactor(mito2): improve compaction error handling and file removal

Refactor compaction task execution to enhance error handling and robustness.
- Implemented parallel execution of compaction tasks with proper error capture and logging for individual task failures.
- Ensured JoinSnafu is no longer directly used in error propagation, instead handling errors within the task processing loop.
- Adjusted file removal logic to correctly include expired SSTs after compaction merges.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor(mito2): extract SstMerger trait for testability in compaction

Extract SstMerger trait and DefaultSstMerger implementation to improve the testability of DefaultCompactor.

The DefaultCompactor is now generic over SstMerger, allowing mock implementations to be injected for unit testing without relying on the full object storage access layer. This refactoring separates the concerns of SST file merging from the overall compaction orchestration logic.

Additionally:
- Updated CompactionScheduler to use DefaultCompactor::default().
- Added unit tests for DefaultCompactor using a MockMerger.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix(compaction): propagate join error during sst flush

Correctly propagates the error when joining SST flush handles during compaction. Previously, the error was logged but not returned, leading to potential silent failures.
Also reorders some imports for consistency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf(compaction): pre-allocate capacity for compacted_inputs

Pre-allocates capacity for the compacted_inputs vector based on the estimated total size of inputs and expired SSTs. This optimization aims to reduce vector reallocations during the compaction process.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-partial-compaction:
 ### Commit Message

 Enhance `DefaultCompactor` and `MockMerger` for Improved Flexibility

 - **`compactor.rs`**:
   - Added `Clone` trait to `DefaultSstMerger` and `MockMerger` to allow cloning.
   - Removed `Arc` wrapping from `DefaultCompactor`'s `merger` field for direct usage.
   - Updated `merge_ssts` method to require `Clone` trait for `SstMerger`.
   - Modified `MockMerger` to use `Arc<Mutex>` for `results` and `call_idx` to ensure thread safety.
   - Adjusted error handling to use `error::InvalidMetaSnafu` directly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: propagate staging leader through lease and heartbeat (#7950)

* feat(mito): expose staging leader role state

* fix(region): clear staging metadata on leader exit

* feat: propagate staging leader role through heartbeat and metasrv

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(region): unify staging exit role transitions

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: cancel local compaction for enter staging (#7885)

* feat(mito2): support cancelling active local compaction

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test(mito2): cover compaction cancellation return paths

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: cancel remaining tasks

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: move group rollback ownership to parent repartition (#7967)

* refactor(meta-srv): move group rollback ownership to parent repartition procedure

- Parent procedure now owns partial rollback based on failed/unknown subprocedures
- rollback order: group metadata first, then allocated-region cleanup
- original_target_routes captured during build-plan, persisted in RepartitionPlanEntry
- rollback_group_metadata_routes moved to utils as parent-owned helper
- Group subprocedure no longer supports rollback (rollback_supported = false)
- Removed UpdateMetadata::RollbackStaging from group state machine
- Deleted redundant group rollback tests and helpers

BREAKING CHANGE: group Procedure no longer handles rollback; parent procedure
is responsible for crash recovery and selecting which plans to roll back.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: use PreFilterMode::All if only one source in the partition range (#7973)

* feat: use PrefilterMode::All if only one source in the partition range

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: consider append_mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: skip merge if only one source

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(meta): renew operating region leases from keeper roles (#7971)

* refactor(meta): store operating region roles in memory keeper

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): register operating region roles from region routes

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): require explicit operating region roles

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(meta): renew operating region leases from keeper roles

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test(common-meta): cover region route role helpers

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test(meta): cover operating region role propagation

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(meta): prefer metadata roles for region lease renewal

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add an index page (#7975)

* feat: include an index page

* fix: address code review

* fix: let / auth gated

* refactor: rename public-apis to public-api-prefix

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: remove redundant error messages in admin functions (#7953)

Closes #7938

Signed-off-by: yxrxy <yxrxytrigger@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* perf: better jieba cut (#7984)

* perf: better jieba cut

Signed-off-by: discord9 <discord9@163.com>

* fix: also filter pun mark

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* docs: explain why

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: allow ipv4_num_to_string to accept valid integers (#7994)

* fix: allow ipv4_num_to_string to accept valid integers

Signed-off-by: Johannes Sluis <joesluis51@gmail.com>

* test: update sqlness result file

Signed-off-by: Johannes Sluis <joesluis51@gmail.com>

* fix: use coercible integer signature for ipv4_num_to_string

Signed-off-by: Johannes Sluis <joesluis51@gmail.com>

---------

Signed-off-by: Johannes Sluis <joesluis51@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update manifest state before deleting delta files (#8001)

* fix: update state before deleting deltas

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update log level

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: upgrade mysql metadata value limit to mediumblob (#7985)

* fix: upgrade mysql metadata values to mediumblob

* fix: fail mysql metadata startup on upgrade check errors

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: zh same underscore behavior (#8002)

* fix: zh same underscore behavior

Signed-off-by: discord9 <discord9@163.com>

* fix: only add token with _ from en analyzer

Signed-off-by: discord9 <discord9@163.com>

* test: neg sqlness case

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: manifest recovery scans after last version if possible (#8009)

* feat: suppport scan with start after

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add start_after test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: adjust remove dir warning

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test list_with_start_after

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: update get_paths call with start_after arg in checkpoint test

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: log scan metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix start_after on manifest dir

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add a standalone flag in plugins during startup (#7974)

* chore: add a standalone flag in plugins during startup

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add derive

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: bump version to v1.0.1

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: liyang <daviderli614@gmail.com>
Signed-off-by: Weixie Cui <cuiweixie@gmail.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: luofucong <luofc@foxmail.com>
Signed-off-by: yxrxy <yxrxytrigger@gmail.com>
Signed-off-by: Johannes Sluis <joesluis51@gmail.com>
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
Co-authored-by: liyang <daviderli614@gmail.com>
Co-authored-by: cui <cuiweixie@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
Co-authored-by: yxrxy <1532529704@qq.com>
Co-authored-by: Joe Sluis <43276756+JoeS51@users.noreply.github.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2026-04-23 17:37:27 +08:00

1687 lines
51 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use auth::user_provider_from_option;
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
};
use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions};
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
use sqlx::types::Decimal;
use sqlx::{Connection, Executor, Row};
use tests_integration::test_util::{
StorageType, setup_mysql_server, setup_mysql_server_with_slow_query_threshold,
setup_mysql_server_with_user_provider, setup_pg_server,
setup_pg_server_with_slow_query_threshold, setup_pg_server_with_user_provider,
};
use tokio_postgres::{Client, NoTls, SimpleQueryMessage};
#[macro_export]
macro_rules! sql_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<integration_sql_ $service:lower _test>] {
$(
#[tokio::test(flavor = "multi_thread")]
$(
#[$meta]
)*
async fn [< $test >]() {
common_telemetry::init_default_ut_logging();
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::info!("test {} starts, store_type: {:?}", stringify!($test), store_type);
let _ = $crate::sql::$test(store_type).await;
}
}
)*
}
}
};
}
#[macro_export]
macro_rules! sql_tests {
($($service:ident),*) => {
$(
sql_test!(
$service,
test_mysql_auth,
test_mysql_stmts,
test_mysql_crud,
test_mysql_timezone,
test_mysql_async_timestamp,
test_mysql_slow_query,
test_postgres_auth,
test_postgres_crud,
test_postgres_timezone,
test_postgres_bytea,
test_postgres_slow_query,
test_postgres_datestyle,
test_postgres_intervalstyle,
test_postgres_parameter_inference,
test_postgres_uint64_parameter,
test_postgres_array_types,
test_mysql_prepare_stmt_insert_timestamp,
test_declare_fetch_close_cursor,
test_alter_update_on,
);
)*
};
}
pub async fn test_mysql_auth(store_type: StorageType) {
let user_provider = user_provider_from_option(
"static_user_provider:cmd:greptime_user=greptime_pwd,readonly_user:ro=readonly_pwd,writeonly_user:wo=writeonly_pwd",
)
.unwrap();
let (mut guard, fe_mysql_server) =
setup_mysql_server_with_user_provider(store_type, "sql_crud", Some(user_provider)).await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
// 1. no auth
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("28000")
);
// 2. wrong pwd
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://greptime_user:wrong_pwd@{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("28000")
);
// 3. right pwd
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://greptime_user:greptime_pwd@{addr}/public"))
.await;
assert!(conn_re.is_ok());
// 4. readonly user
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://readonly_user:readonly_pwd@{addr}/public"))
.await;
assert!(conn_re.is_ok());
let pool = conn_re.unwrap();
let _ = pool.execute("SELECT 1").await.unwrap();
let err = pool
.execute("CREATE TABLE test (ts timestamp time index)")
.await
.unwrap_err();
assert!(
err.to_string()
.contains("(PermissionDenied): User is not authorized to perform this action"),
"{}",
err.to_string()
);
// 5. writeonly user
let conn_re = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!(
"mysql://writeonly_user:writeonly_pwd@{addr}/public"
))
.await;
assert!(conn_re.is_ok());
let pool = conn_re.unwrap();
let _ = pool
.execute("CREATE TABLE test (ts timestamp time index)")
.await
.unwrap();
let err = pool.execute("SHOW TABLES").await.unwrap_err();
assert!(
err.to_string()
.contains("(PermissionDenied): User is not authorized to perform this action"),
"{}",
err.to_string()
);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_stmts(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_stmts").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
conn.execute("SET SESSION TRANSACTION READ ONLY")
.await
.unwrap();
conn.execute("SET TRANSACTION READ ONLY").await.unwrap();
// empty statements
// Only when "--" is followed by a whitespace is it considered a valid comment in MySQL,
// see https://dev.mysql.com/doc/refman/8.4/en/ansi-diff-comments.html
let err = conn.execute(" -- ----- ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute("-- --------\n;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let err = conn.execute(" \n ;").await.unwrap_err();
assert!(err.to_string().contains("empty statements"));
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_crud(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_crud").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt timestamp(3) default null, b blob default null, j json, v vector(3) default null)",
)
.execute(&pool)
.await
.unwrap();
for i in 0..10 {
let dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
chrono::DateTime::from_timestamp(60, i).unwrap().naive_utc(),
Utc,
);
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
let json = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
let vector = "[1,2,3]";
sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?, ?)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.bind(json)
.bind(vector)
.execute(&pool)
.await
.unwrap();
}
let rows = sqlx::query("select i, d, dt, b, j, v from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 10);
for (i, row) in rows.iter().enumerate() {
let ret: i64 = row.get("i");
let d: NaiveDate = row.get("d");
let dt: DateTime<Utc> = row.get("dt");
let bytes: Vec<u8> = row.get("b");
let json: serde_json::Value = row.get("j");
let vector: Vec<u8> = row.get("v");
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
let expected_dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
chrono::DateTime::from_timestamp(60, i as u32)
.unwrap()
.naive_utc(),
Utc,
);
assert_eq!(
format!("{}", expected_dt.format("%Y-%m-%d %H:%M:%S")),
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
);
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
let expected_j = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
assert_eq!(json, expected_j);
assert_eq!(vector, "[1,2,3]".as_bytes());
}
let rows = sqlx::query("select i from demo where i=?")
.bind(6)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 1);
for row in rows {
let ret: i64 = row.get("i");
assert_eq!(ret, 6);
}
// parameter type mismatch
let query_re = sqlx::query("select i from demo where i = ?")
.bind("test")
.fetch_all(&pool)
.await;
assert!(query_re.is_err());
let err = query_re.unwrap_err();
common_telemetry::info!("Error is {}", err);
assert_eq!(
err.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.number(),
1210,
);
let _ = sqlx::query("delete from demo")
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select i from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 0);
// test prepare with default columns
sqlx::query("insert into demo(i) values(?)")
.bind(99)
.execute(&pool)
.await
.unwrap();
sqlx::query("insert into demo(i) values(?)")
.bind(-99)
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select * from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 2);
for row in rows {
let i: i64 = row.get("i");
let ts: DateTime<Utc> = row.get("ts");
let now = common_time::util::current_time_millis();
assert!(now - ts.timestamp_millis() < 1000);
assert_eq!(i.abs(), 99);
}
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_timezone(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_timezone").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
let _ = conn
.execute("SET time_zone = 'Asia/Shanghai'")
.await
.unwrap();
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "Asia/Shanghai");
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "Asia/Shanghai");
let timezone = conn.fetch_all("SELECT @@system_time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
let _ = conn.execute("SET time_zone = 'UTC'").await.unwrap();
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
let timezone = conn.fetch_all("SELECT @@system_time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
// test data
let _ = conn
.execute("create table demo(i bigint, ts timestamp time index)")
.await
.unwrap();
let _ = conn
.execute("insert into demo values(1, 1667446797450)")
.await
.unwrap();
let rows = conn.fetch_all("select ts from demo").await.unwrap();
assert_eq!(
rows[0]
.get::<chrono::DateTime<Utc>, usize>(0)
.to_rfc3339_opts(SecondsFormat::Millis, true),
"2022-11-03T03:39:57.450Z"
);
let _ = conn.execute("SET time_zone = '+08:00'").await.unwrap();
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "+08:00");
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "+08:00");
let rows2 = conn.fetch_all("select ts from demo").await.unwrap();
// we use Utc here for format only
assert_eq!(
rows2[0]
.get::<chrono::DateTime<Utc>, usize>(0)
.to_rfc3339_opts(SecondsFormat::Millis, true),
"2022-11-03T11:39:57.450Z"
);
let _ = conn
.execute("SET @@SESSION.TIME_ZONE = '-7:00'")
.await
.unwrap();
let rows2 = conn.fetch_all("select ts from demo").await.unwrap();
// we use Utc here for format only
assert_eq!(
rows2[0]
.get::<chrono::DateTime<Utc>, usize>(0)
.to_rfc3339_opts(SecondsFormat::Millis, true),
"2022-11-02T20:39:57.450Z"
);
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "-07:00");
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
assert_eq!(timezone[0].get::<String, usize>(0), "-07:00");
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_auth(store_type: StorageType) {
let user_provider =
user_provider_from_option("static_user_provider:cmd:greptime_user=greptime_pwd").unwrap();
let (mut guard, fe_pg_server) =
setup_pg_server_with_user_provider(store_type, "sql_crud", Some(user_provider)).await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
// 1. no auth
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<PgDatabaseError>()
.code(),
"28P01"
);
// 2. wrong pwd
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://greptime_user:wrong_pwd@{addr}/public"))
.await;
assert!(conn_re.is_err());
assert_eq!(
conn_re
.err()
.unwrap()
.into_database_error()
.unwrap()
.downcast::<PgDatabaseError>()
.code(),
"28P01"
);
// 2. right pwd
let conn_re = PgPoolOptions::new()
.max_connections(2)
.connect(&format!(
"postgres://greptime_user:greptime_pwd@{addr}/public"
))
.await;
assert!(conn_re.is_ok());
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_alter_update_on(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_crud").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)",
)
.execute(&pool)
.await
.unwrap();
let row_before_alter = sqlx::query(
"SELECT *
FROM information_schema.tables WHERE table_name = $1;",
)
.bind("demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(row_before_alter.len(), 1);
let before_row = &row_before_alter[0];
let created_on: NaiveDateTime = before_row.get("create_time");
let updated_on_before: NaiveDateTime = before_row.get("update_time");
assert_eq!(created_on, updated_on_before);
std::thread::sleep(std::time::Duration::from_millis(1100));
sqlx::query("alter table demo add column j json;")
.execute(&pool)
.await
.unwrap();
let row_after_alter = sqlx::query(
"SELECT *
FROM information_schema.tables WHERE table_name = $1;",
)
.bind("demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(row_after_alter.len(), 1);
let after_row = &row_after_alter[0];
let updated_on_after: NaiveDateTime = after_row.get("update_time");
assert_ne!(updated_on_before, updated_on_after);
let _ = sqlx::query("delete from demo")
.execute(&pool)
.await
.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_crud(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_crud").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob, j json)",
)
.execute(&pool)
.await
.unwrap();
for i in 0..10 {
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_micros();
let bytes = "hello".as_bytes();
let json = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
sqlx::query("insert into demo values($1, $2, $3, $4, $5, $6)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.bind(json)
.execute(&pool)
.await
.unwrap();
}
let rows = sqlx::query("select i,d,dt,b,j from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 10);
for (i, row) in rows.iter().enumerate() {
let ret: i64 = row.get("i");
let d: NaiveDate = row.get("d");
let dt: NaiveDateTime = row.get("dt");
let bytes: Vec<u8> = row.get("b");
let json: serde_json::Value = row.get("j");
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
let expected_dt = NaiveDate::from_yo_opt(2015, 100)
.and_then(|d| d.and_hms_opt(0, 0, 0))
.unwrap();
assert_eq!(expected_dt, dt);
assert_eq!("hello".as_bytes(), bytes);
let expected_j = serde_json::json!({
"code": i,
"success": true,
"payload": {
"features": [
"serde",
"json"
],
"homepage": null
}
});
assert_eq!(json.to_string(), expected_j.to_string());
}
let rows = sqlx::query("select i from demo where i=$1")
.bind(6)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 1);
for row in rows {
let ret: i64 = row.get("i");
assert_eq!(ret, 6);
}
let _ = sqlx::query("delete from demo")
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select i from demo")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 0);
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_slow_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) = setup_mysql_server_with_slow_query_threshold(
store_type,
"test_mysql_slow_query",
Duration::from_millis(100),
)
.await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
// The slow query should run longer than the configured threshold.
let slow_query = "SELECT count(*) FROM generate_series(1, 50000000)";
// Simulate a slow query.
sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table} WHERE {} = ?",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
);
let row = tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Ok(Some(row)) = sqlx::query(&query)
.bind(slow_query)
.fetch_optional(&pool)
.await
{
break row;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.unwrap();
// Check the results.
let cost: u64 = row.get(0);
let threshold: u64 = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0 && threshold > 0 && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_bytea(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let _ = client
.simple_query("CREATE TABLE test(b BLOB, ts TIMESTAMP TIME INDEX)")
.await
.unwrap();
let _ = client
.simple_query("INSERT INTO test VALUES(X'6162636b6c6d2aa954', 0)")
.await
.unwrap();
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!(),
}
};
let r = client.simple_query("SELECT b FROM test").await.unwrap();
let b = get_row(r);
assert_eq!(b, "\\x6162636b6c6d2aa954");
let _ = client.simple_query("SET bytea_output='hex'").await.unwrap();
let r = client.simple_query("SELECT b FROM test").await.unwrap();
let b = get_row(r);
assert_eq!(b, "\\x6162636b6c6d2aa954");
let _ = client
.simple_query("SET bytea_output='escape'")
.await
.unwrap();
let r = client.simple_query("SELECT b FROM test").await.unwrap();
let b = get_row(r);
assert_eq!(b, "abcklm*\\251T");
let _e = client
.simple_query("SET bytea_output='invalid'")
.await
.unwrap_err();
// binary format shall not be affected by bytea_output
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
let row = sqlx::query("select b from test")
.fetch_one(&pool)
.await
.unwrap();
let val: Vec<u8> = row.get("b");
assert_eq!(val, [97, 98, 99, 107, 108, 109, 42, 169, 84]);
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_slow_query(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server_with_slow_query_threshold(
store_type,
"test_postgres_slow_query",
Duration::from_millis(100),
)
.await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
let slow_query = "SELECT count(*) FROM generate_series(1, 50000000)";
let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table} WHERE {} = $1",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
);
let row = tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Ok(Some(row)) = sqlx::query(&query)
.bind(slow_query)
.fetch_optional(&pool)
.await
{
break row;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.unwrap();
let cost: Decimal = row.get(0);
let threshold: Decimal = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0.into() && threshold > 0.into() && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_datestyle(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let validate_datestyle = |client: Client, datestyle: &str, is_valid: bool| {
let datestyle = datestyle.to_string();
async move {
assert_eq!(
client
.simple_query(format!("SET DATESTYLE={}", datestyle).as_str())
.await
.is_ok(),
is_valid
);
client
}
};
// style followed by order is valid
let client = validate_datestyle(client, "'ISO,MDY'", true).await;
// Mix of string and ident is valid
let client = validate_datestyle(client, "'ISO',MDY", true).await;
// list of string that didn't corrupt is valid
let client = validate_datestyle(client, "'ISO,MDY','ISO,MDY'", true).await;
// corrupted style
let client = validate_datestyle(client, "'ISO,German'", false).await;
// corrupted order
let client = validate_datestyle(client, "'ISO,DMY','ISO,MDY'", false).await;
// as long as the value is not corrupted, it's valid
let client = validate_datestyle(client, "ISO,ISO,ISO,ISO,ISO,MDY,MDY,MDY,MDY", true).await;
let _ = client
.simple_query("CREATE TABLE ts_test(ts TIMESTAMP TIME INDEX)")
.await
.expect("CREATE TABLE ts_test ERROR");
let _ = client
.simple_query("CREATE TABLE date_test(d date, ts TIMESTAMP TIME INDEX)")
.await
.expect("CREATE TABLE date_test ERROR");
let _ = client
.simple_query("CREATE TABLE dt_test(dt datetime, ts TIMESTAMP TIME INDEX)")
.await
.expect("CREATE TABLE dt_test ERROR");
let _ = client
.simple_query("INSERT INTO ts_test VALUES('1997-12-17 07:37:16.123')")
.await
.expect("INSERT INTO ts_test ERROR");
let _ = client
.simple_query("INSERT INTO date_test VALUES('1997-12-17', '1997-12-17 07:37:16.123')")
.await
.expect("INSERT INTO date_test ERROR");
let _ = client
.simple_query(
"INSERT INTO dt_test VALUES('1997-12-17 07:37:16.123', '1997-12-17 07:37:16.123')",
)
.await
.expect("INSERT INTO dt_test ERROR");
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!("Unexpected messages: {:?}", mess),
}
};
let date = "DATE";
let datetime = "TIMESTAMP";
let timestamp = "TIMESTAMP";
let iso = "ISO";
let sql = "SQL";
let postgres = "Postgres";
let german = "German";
let expected_set: HashMap<&str, HashMap<&str, HashMap<&str, &str>>> = HashMap::from([
(
date,
HashMap::from([
(
iso,
HashMap::from([
("MDY", "1997-12-17"),
("DMY", "1997-12-17"),
("YMD", "1997-12-17"),
]),
),
(
sql,
HashMap::from([
("MDY", "12/17/1997"),
("DMY", "17/12/1997"),
("YMD", "12/17/1997"),
]),
),
(
postgres,
HashMap::from([
("MDY", "12-17-1997"),
("DMY", "17-12-1997"),
("YMD", "12-17-1997"),
]),
),
(
german,
HashMap::from([
("MDY", "17.12.1997"),
("DMY", "17.12.1997"),
("YMD", "17.12.1997"),
]),
),
]),
),
(
timestamp,
HashMap::from([
(
iso,
HashMap::from([
("MDY", "1997-12-17 07:37:16.123000"),
("DMY", "1997-12-17 07:37:16.123000"),
("YMD", "1997-12-17 07:37:16.123000"),
]),
),
(
sql,
HashMap::from([
("MDY", "12/17/1997 07:37:16.123000"),
("DMY", "17/12/1997 07:37:16.123000"),
("YMD", "12/17/1997 07:37:16.123000"),
]),
),
(
postgres,
HashMap::from([
("MDY", "Wed Dec 17 07:37:16.123000 1997"),
("DMY", "Wed 17 Dec 07:37:16.123000 1997"),
("YMD", "Wed Dec 17 07:37:16.123000 1997"),
]),
),
(
german,
HashMap::from([
("MDY", "17.12.1997 07:37:16.123000"),
("DMY", "17.12.1997 07:37:16.123000"),
("YMD", "17.12.1997 07:37:16.123000"),
]),
),
]),
),
]);
let get_expected = |ty: &str, style: &str, order: &str| {
expected_set
.get(ty)
.and_then(|m| m.get(style))
.and_then(|m2| m2.get(order))
.unwrap()
.to_string()
};
for style in ["ISO", "SQL", "Postgres", "German"] {
for order in ["MDY", "DMY", "YMD"] {
let _ = client
.simple_query(&format!("SET DATESTYLE='{}', '{}'", style, order))
.await
.expect("SET DATESTYLE ERROR");
let r = client.simple_query("SELECT ts FROM ts_test").await.unwrap();
let ts = get_row(r);
assert_eq!(
ts,
get_expected(timestamp, style, order),
"style: {}, order: {}",
style,
order
);
let r = client
.simple_query("SELECT d FROM date_test")
.await
.unwrap();
let d = get_row(r);
assert_eq!(
d,
get_expected(date, style, order),
"style: {}, order: {}",
style,
order
);
let r = client.simple_query("SELECT dt FROM dt_test").await.unwrap();
let dt = get_row(r);
assert_eq!(
dt,
get_expected(datetime, style, order),
"style: {}, order: {}",
style,
order
);
}
}
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_intervalstyle(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_intervalstyle").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let validate_intervalstyle = |client: Client, intervalstyle: &str, is_valid: bool| {
let intervalstyle = intervalstyle.to_string();
async move {
assert_eq!(
client
.simple_query(format!("SET INTERVALSTYLE='{}'", intervalstyle).as_str())
.await
.is_ok(),
is_valid,
"testing intervalstyle {intervalstyle}"
);
client
}
};
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!(),
}
};
let client = validate_intervalstyle(client, "iso_8601", true).await;
let client = validate_intervalstyle(client, "sql_standard", true).await;
let client = validate_intervalstyle(client, "postgres", true).await;
let client = validate_intervalstyle(client, "postgres_verbose", true).await;
let client = validate_intervalstyle(client, "invalid_style", false).await;
let expected_formats: HashMap<&str, &str> = HashMap::from([
("iso_8601", "P1DT2H3M"),
("sql_standard", "1 2:03:00"),
("postgres", "1 day 02:03:00"),
("postgres_verbose", "@ 1 day 2 hours 3 mins"),
]);
for (style, expected_format) in expected_formats {
let _ = client
.simple_query(&format!("SET INTERVALSTYLE='{}'", style))
.await
.expect("SET INTERVALSTYLE ERROR");
let interval = get_row(
client
.simple_query("SHOW VARIABLES intervalstyle")
.await
.unwrap(),
);
assert_eq!(interval, style);
let result = get_row(
client
.simple_query("SELECT INTERVAL '1 day 2 hours 3 minutes'")
.await
.unwrap(),
);
assert_eq!(
result, expected_format,
"intervalstyle {}: expected '{}', got '{}'",
style, expected_format, result
);
}
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_timezone(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_timezone").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
match &mess[1] {
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
_ => unreachable!(),
}
};
let _ = client.simple_query("SET time_zone = 'UTC'").await.unwrap();
let timezone = get_row(
client
.simple_query("SHOW VARIABLES time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "UTC");
let timezone = get_row(
client
.simple_query("SHOW VARIABLES system_time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "UTC");
let _ = client
.simple_query("SET time_zone = 'Asia/Shanghai'")
.await
.unwrap();
let timezone = get_row(
client
.simple_query("SHOW VARIABLES time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "Asia/Shanghai");
let timezone = get_row(
client
.simple_query("SHOW VARIABLES system_time_zone")
.await
.unwrap(),
);
assert_eq!(timezone, "UTC");
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_parameter_inference(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_parameter_inference").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
// Create demo table
let _ = client
.simple_query("create table demo(i bigint, ts timestamp time index, d date, dt datetime)")
.await
.unwrap();
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
let dt = d.and_hms_opt(0, 0, 0).unwrap();
let _ = client
.execute(
"INSERT INTO demo VALUES($1, $2, $3, $4)",
&[&0i64, &dt, &d, &dt],
)
.await
.unwrap();
let rows = client
.query("SELECT * FROM demo WHERE i = $1", &[&0i64])
.await
.unwrap();
assert_eq!(1, rows.len());
// Shutdown the client.
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_uint64_parameter(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_postgres_uint64_parameter").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let _ = client
.simple_query("create table demo_u64(v bigint unsigned, ts timestamp time index)")
.await
.unwrap();
let dt = NaiveDate::from_yo_opt(2015, 100)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
let _ = client
.execute(
"INSERT INTO demo_u64 VALUES($1, $2)",
&[&Decimal::from(123456u64), &dt],
)
.await
.unwrap();
let rows = client
.query(
"SELECT count(*) FROM demo_u64 WHERE v = $1",
&[&Decimal::from(123456u64)],
)
.await
.unwrap();
assert_eq!(1, rows.len());
let count: i64 = rows[0].get(0);
assert_eq!(count, 1);
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_async_timestamp(store_type: StorageType) {
use mysql_async::prelude::*;
use time::PrimitiveDateTime;
#[derive(Debug)]
struct CpuMetric {
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
}
impl CpuMetric {
fn new(
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
) -> Self {
Self {
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
}
}
}
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) =
setup_mysql_server(store_type, "test_mysql_async_timestamp").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let url = format!("mysql://{addr}/public");
let opts = mysql_async::Opts::from_url(&url).unwrap();
let mut conn = mysql_async::Conn::new(opts)
.await
.expect("create connection failure");
r"CREATE TABLE IF NOT EXISTS cpu_metrics (
hostname STRING,
environment STRING,
usage_user DOUBLE,
usage_system DOUBLE,
usage_idle DOUBLE,
ts TIMESTAMP,
TIME INDEX(ts),
PRIMARY KEY(hostname, environment)
);"
.ignore(&mut conn)
.await
.expect("create table failure");
let metrics = [
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307200050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307200050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307260050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307260050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307320050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307320050,
),
];
r"INSERT INTO cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
.with(metrics.iter().map(|metric| {
params! {
"hostname" => &metric.hostname,
"environment" => &metric.environment,
"usage_user" => metric.usage_user,
"usage_system" => metric.usage_system,
"usage_idle" => metric.usage_idle,
"ts" => metric.ts,
}
}))
.batch(&mut conn)
.await
.expect("insert data failure");
// query data
let loaded_metrics = "SELECT * FROM cpu_metrics"
.with(())
.map(
&mut conn,
|(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
String,
String,
f64,
f64,
f64,
PrimitiveDateTime,
)| {
let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
CpuMetric::new(
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
)
},
)
.await
.expect("query data failure");
assert_eq!(loaded_metrics.len(), 6);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) {
let (mut guard, server) =
setup_mysql_server(store_type, "test_mysql_prepare_stmt_insert_timestamp").await;
let addr = server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
sqlx::query("create table demo(i bigint, ts timestamp time index)")
.execute(&pool)
.await
.unwrap();
// Valid timestamp binary encoding: https://mariadb.com/kb/en/resultset-row/#timestamp-binary-encoding
// Timestamp data length = 4, year-month-day(ymd) only:
sqlx::query("insert into demo values(?, ?)")
.bind(0)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
// Though hour, minute and second are provided, `sqlx` will not encode them if they are all zeroes,
// which is just what we desire here.
// See https://github.com/launchbadge/sqlx/blob/bb064e3789d68ad4e9affe7cba34944abb000f72/sqlx-core/src/mysql/types/chrono.rs#L186C22-L186C22
.and_then(|x| x.and_hms_opt(0, 0, 0))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();
// Timestamp data length = 7, ymd and hour-minute-second(hms):
sqlx::query("insert into demo values(?, ?)")
.bind(1)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
.and_then(|x| x.and_hms_opt(13, 19, 1))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();
// Timestamp data length = 11, ymd, hms and microseconds:
sqlx::query("insert into demo values(?, ?)")
.bind(2)
.bind(
NaiveDate::from_ymd_opt(2023, 12, 19)
.and_then(|x| x.and_hms_micro_opt(13, 20, 1, 123456))
.unwrap(),
)
.execute(&pool)
.await
.unwrap();
let rows = sqlx::query("select i, ts from demo order by i")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(rows.len(), 3);
let x: DateTime<Utc> = rows[0].get("ts");
assert_eq!(x.to_string(), "2023-12-19 00:00:00 UTC");
let x: DateTime<Utc> = rows[1].get("ts");
assert_eq!(x.to_string(), "2023-12-19 13:19:01 UTC");
let x: DateTime<Utc> = rows[2].get("ts");
assert_eq!(x.to_string(), "2023-12-19 13:20:01.123 UTC");
let _ = server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_array_types(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_array_types").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
let rows = client
.query(
"SELECT arrow_cast(1, 'List(Int8)'), arrow_cast('tom', 'List(Utf8)'), arrow_cast(3.14, 'List(Float32)'), arrow_cast('2023-01-02T12:53:02', 'List(Timestamp(Millisecond, None))')",
&[],
)
.await
.unwrap();
assert_eq!(1, rows.len());
// Shutdown the client.
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_declare_fetch_close_cursor(store_type: StorageType) {
let (mut guard, fe_pg_server) =
setup_pg_server(store_type, "test_declare_fetch_close_cursor").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
.await
.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
connection.await.unwrap();
tx.send(()).unwrap();
});
client
.execute(
"DECLARE c1 CURSOR FOR SELECT * FROM numbers WHERE number > 2 LIMIT 50::bigint",
&[],
)
.await
.expect("declare cursor");
// duplicated cursor
assert!(
client
.execute("DECLARE c1 CURSOR FOR SELECT 1", &[],)
.await
.is_err()
);
let rows = client.query("FETCH 5 FROM c1", &[]).await.unwrap();
assert_eq!(5, rows.len());
let rows = client.query("FETCH 100 FROM c1", &[]).await.unwrap();
assert_eq!(45, rows.len());
let rows = client.query("FETCH 100 FROM c1", &[]).await.unwrap();
assert_eq!(0, rows.len());
client.execute("CLOSE c1", &[]).await.expect("close cursor");
// cursor not found
let result = client.query("FETCH 100 FROM c1", &[]).await;
assert!(result.is_err());
client
.execute(
"DECLARE c2 CURSOR FOR SELECT * FROM numbers WHERE number < 0",
&[],
)
.await
.expect("declare cursor");
let rows = client.query("FETCH 5 FROM c2", &[]).await.unwrap();
assert_eq!(0, rows.len());
client.execute("CLOSE c2", &[]).await.expect("close cursor");
// Shutdown the client.
drop(client);
rx.await.unwrap();
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}