mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 03:50:39 +00:00
* fix: remap peer addresses during retries (#7933) * fix: remap peer addresses during retries Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: styling Signed-off-by: WenyXu <wenymedia@gmail.com> * test: add tests Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: using uint64 datatype for postgres prepared statement parameters (#7942) * feat: add support for decimal parameter type, remove string replacement fallback * chore: format * fix: add support for using unsigned bigint in postgres * chore: format toml * refactor: cleanup duplicated code * fix: rescale decimal Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: fix current version comparison logic for pre-releases (#7946) Signed-off-by: liyang <daviderli614@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(index): intersect bitmaps before early exit in predicates applier (#7867) * fix(index): intersect bitmaps before early exit in predicates applier The loop skipped intersecting when the next bitmap was empty, which left the accumulator unchanged instead of zeroing it. Intersect first, then break when the result is empty. Signed-off-by: Weixie Cui <cuiweixie@gmail.com> * per gemini * style(index): format predicates applier loop * fix(index): remove unused mut in predicates applier --------- Signed-off-by: Weixie Cui <cuiweixie@gmail.com> Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com> Co-authored-by: discord9 <discord9@163.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: randomize standalone test ports in cli export test (#7955) fix/flaky-test: ### Add Dynamic Port Selection for Standalone Tests - **`cli.rs`**: Implemented functions `random_standalone_addrs` and `choose_random_unused_port_offset` to dynamically select unused ports for standalone tests, enhancing test reliability. - Updated `test_export_create_table_with_quoted_names` to use dynamically assigned ports for HTTP, RPC, MySQL, and PostgreSQL addresses. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: fix git cliff errors in latest version (#7947) * chore: fix git cliff errors in latest version - Fix errors in v2.12.0 - Do not generate logs for beta/rc tags between the compared commits Signed-off-by: evenyag <realevenyag@gmail.com> * chore: preserve blank line before release date in changelog Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: match term zh (#7952) * fix: match term zh Signed-off-by: discord9 <discord9@163.com> * chore: per gemini Signed-off-by: discord9 <discord9@163.com> * chore: revert accident change Signed-off-by: discord9 <discord9@163.com> * feat: unicode script han Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * ci: set upload timeout for uploading artifacts to S3 (#7958) * ci: set upload timeout for uploading artifacts to S3 Signed-off-by: liyang <daviderli614@gmail.com> * Update upload-artifacts-to-s3.sh --------- Signed-off-by: liyang <daviderli614@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: cargo check -p common-meta (#7964) fix: moka feature Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: always skip field pruning when using merge mode (#7957) * test: add prefilter regressions for last_row null filters Signed-off-by: evenyag <realevenyag@gmail.com> * fix: skip fields in all merge mode Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: simplify pre-filter skip fields handling Signed-off-by: evenyag <realevenyag@gmail.com> * test: update test Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: mysql prepare correctly returns error instead of panic (#7963) feat: mysql writer support multiple statement execution Signed-off-by: luofucong <luofc@foxmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: relax azblob validation requirements (#7970) Signed-off-by: WenyXu <wenymedia@gmail.com> * feat(mito2): allow CompactionOutput to succeed independently (#7948) * refactor(mito2): improve compaction error handling and file removal Refactor compaction task execution to enhance error handling and robustness. - Implemented parallel execution of compaction tasks with proper error capture and logging for individual task failures. - Ensured JoinSnafu is no longer directly used in error propagation, instead handling errors within the task processing loop. - Adjusted file removal logic to correctly include expired SSTs after compaction merges. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * refactor(mito2): extract SstMerger trait for testability in compaction Extract SstMerger trait and DefaultSstMerger implementation to improve the testability of DefaultCompactor. The DefaultCompactor is now generic over SstMerger, allowing mock implementations to be injected for unit testing without relying on the full object storage access layer. This refactoring separates the concerns of SST file merging from the overall compaction orchestration logic. Additionally: - Updated CompactionScheduler to use DefaultCompactor::default(). - Added unit tests for DefaultCompactor using a MockMerger. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix(compaction): propagate join error during sst flush Correctly propagates the error when joining SST flush handles during compaction. Previously, the error was logged but not returned, leading to potential silent failures. Also reorders some imports for consistency. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * perf(compaction): pre-allocate capacity for compacted_inputs Pre-allocates capacity for the compacted_inputs vector based on the estimated total size of inputs and expired SSTs. This optimization aims to reduce vector reallocations during the compaction process. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/allow-partial-compaction: ### Commit Message Enhance `DefaultCompactor` and `MockMerger` for Improved Flexibility - **`compactor.rs`**: - Added `Clone` trait to `DefaultSstMerger` and `MockMerger` to allow cloning. - Removed `Arc` wrapping from `DefaultCompactor`'s `merger` field for direct usage. - Updated `merge_ssts` method to require `Clone` trait for `SstMerger`. - Modified `MockMerger` to use `Arc<Mutex>` for `results` and `call_idx` to ensure thread safety. - Adjusted error handling to use `error::InvalidMetaSnafu` directly. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: propagate staging leader through lease and heartbeat (#7950) * feat(mito): expose staging leader role state * fix(region): clear staging metadata on leader exit * feat: propagate staging leader role through heartbeat and metasrv * chore: update comments Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(region): unify staging exit role transitions * chore: update proto Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: cancel local compaction for enter staging (#7885) * feat(mito2): support cancelling active local compaction Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> * test(mito2): cover compaction cancellation return paths Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: cancel remaining tasks Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor: move group rollback ownership to parent repartition (#7967) * refactor(meta-srv): move group rollback ownership to parent repartition procedure - Parent procedure now owns partial rollback based on failed/unknown subprocedures - rollback order: group metadata first, then allocated-region cleanup - original_target_routes captured during build-plan, persisted in RepartitionPlanEntry - rollback_group_metadata_routes moved to utils as parent-owned helper - Group subprocedure no longer supports rollback (rollback_supported = false) - Removed UpdateMetadata::RollbackStaging from group state machine - Deleted redundant group rollback tests and helpers BREAKING CHANGE: group Procedure no longer handles rollback; parent procedure is responsible for crash recovery and selecting which plans to roll back. Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update comments Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: use PreFilterMode::All if only one source in the partition range (#7973) * feat: use PrefilterMode::All if only one source in the partition range Signed-off-by: evenyag <realevenyag@gmail.com> * fix: consider append_mode Signed-off-by: evenyag <realevenyag@gmail.com> * chore: skip merge if only one source Signed-off-by: evenyag <realevenyag@gmail.com> * test: fix test Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(meta): renew operating region leases from keeper roles (#7971) * refactor(meta): store operating region roles in memory keeper Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor(meta): register operating region roles from region routes Signed-off-by: WenyXu <wenymedia@gmail.com> * refactor(meta): require explicit operating region roles Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(meta): renew operating region leases from keeper roles Signed-off-by: WenyXu <wenymedia@gmail.com> * test(common-meta): cover region route role helpers Signed-off-by: WenyXu <wenymedia@gmail.com> * test(meta): cover operating region role propagation Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions Signed-off-by: WenyXu <wenymedia@gmail.com> * fix(meta): prefer metadata roles for region lease renewal Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: add an index page (#7975) * feat: include an index page * fix: address code review * fix: let / auth gated * refactor: rename public-apis to public-api-prefix Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: remove redundant error messages in admin functions (#7953) Closes #7938 Signed-off-by: yxrxy <yxrxytrigger@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * perf: better jieba cut (#7984) * perf: better jieba cut Signed-off-by: discord9 <discord9@163.com> * fix: also filter pun mark Signed-off-by: discord9 <discord9@163.com> * chore Signed-off-by: discord9 <discord9@163.com> * docs: explain why Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: allow ipv4_num_to_string to accept valid integers (#7994) * fix: allow ipv4_num_to_string to accept valid integers Signed-off-by: Johannes Sluis <joesluis51@gmail.com> * test: update sqlness result file Signed-off-by: Johannes Sluis <joesluis51@gmail.com> * fix: use coercible integer signature for ipv4_num_to_string Signed-off-by: Johannes Sluis <joesluis51@gmail.com> --------- Signed-off-by: Johannes Sluis <joesluis51@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: update manifest state before deleting delta files (#8001) * fix: update state before deleting deltas Signed-off-by: evenyag <realevenyag@gmail.com> * chore: update comment Signed-off-by: evenyag <realevenyag@gmail.com> * chore: update log level Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: upgrade mysql metadata value limit to mediumblob (#7985) * fix: upgrade mysql metadata values to mediumblob * fix: fail mysql metadata startup on upgrade check errors Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: zh same underscore behavior (#8002) * fix: zh same underscore behavior Signed-off-by: discord9 <discord9@163.com> * fix: only add token with _ from en analyzer Signed-off-by: discord9 <discord9@163.com> * test: neg sqlness case Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * fix: manifest recovery scans after last version if possible (#8009) * feat: suppport scan with start after Signed-off-by: evenyag <realevenyag@gmail.com> * test: add start_after test Signed-off-by: evenyag <realevenyag@gmail.com> * chore: adjust remove dir warning Signed-off-by: evenyag <realevenyag@gmail.com> * test: test list_with_start_after Signed-off-by: evenyag <realevenyag@gmail.com> * fix: update get_paths call with start_after arg in checkpoint test Signed-off-by: evenyag <realevenyag@gmail.com> * feat: log scan metrics Signed-off-by: evenyag <realevenyag@gmail.com> * fix: fix start_after on manifest dir Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: add a standalone flag in plugins during startup (#7974) * chore: add a standalone flag in plugins during startup Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: add derive Signed-off-by: shuiyisong <xixing.sys@gmail.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com> Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: bump version to v1.0.1 Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com> Signed-off-by: liyang <daviderli614@gmail.com> Signed-off-by: Weixie Cui <cuiweixie@gmail.com> Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Signed-off-by: evenyag <realevenyag@gmail.com> Signed-off-by: discord9 <discord9@163.com> Signed-off-by: luofucong <luofc@foxmail.com> Signed-off-by: yxrxy <yxrxytrigger@gmail.com> Signed-off-by: Johannes Sluis <joesluis51@gmail.com> Signed-off-by: shuiyisong <xixing.sys@gmail.com> Co-authored-by: Ning Sun <sunng@protonmail.com> Co-authored-by: liyang <daviderli614@gmail.com> Co-authored-by: cui <cuiweixie@gmail.com> Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com> Co-authored-by: discord9 <discord9@163.com> Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> Co-authored-by: Yingwen <realevenyag@gmail.com> Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com> Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> Co-authored-by: yxrxy <1532529704@qq.com> Co-authored-by: Joe Sluis <43276756+JoeS51@users.noreply.github.com> Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
1687 lines
51 KiB
Rust
1687 lines
51 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use std::collections::HashMap;
|
|
use std::time::Duration;
|
|
|
|
use auth::user_provider_from_option;
|
|
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
|
|
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
|
|
use common_frontend::slow_query_event::{
|
|
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
|
|
};
|
|
use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions};
|
|
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
|
|
use sqlx::types::Decimal;
|
|
use sqlx::{Connection, Executor, Row};
|
|
use tests_integration::test_util::{
|
|
StorageType, setup_mysql_server, setup_mysql_server_with_slow_query_threshold,
|
|
setup_mysql_server_with_user_provider, setup_pg_server,
|
|
setup_pg_server_with_slow_query_threshold, setup_pg_server_with_user_provider,
|
|
};
|
|
use tokio_postgres::{Client, NoTls, SimpleQueryMessage};
|
|
|
|
#[macro_export]
|
|
macro_rules! sql_test {
|
|
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
|
|
paste::item! {
|
|
mod [<integration_sql_ $service:lower _test>] {
|
|
$(
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
$(
|
|
#[$meta]
|
|
)*
|
|
async fn [< $test >]() {
|
|
common_telemetry::init_default_ut_logging();
|
|
|
|
let store_type = tests_integration::test_util::StorageType::$service;
|
|
if store_type.test_on() {
|
|
common_telemetry::info!("test {} starts, store_type: {:?}", stringify!($test), store_type);
|
|
|
|
let _ = $crate::sql::$test(store_type).await;
|
|
}
|
|
|
|
}
|
|
)*
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
#[macro_export]
|
|
macro_rules! sql_tests {
|
|
($($service:ident),*) => {
|
|
$(
|
|
sql_test!(
|
|
$service,
|
|
|
|
test_mysql_auth,
|
|
test_mysql_stmts,
|
|
test_mysql_crud,
|
|
test_mysql_timezone,
|
|
test_mysql_async_timestamp,
|
|
test_mysql_slow_query,
|
|
test_postgres_auth,
|
|
test_postgres_crud,
|
|
test_postgres_timezone,
|
|
test_postgres_bytea,
|
|
test_postgres_slow_query,
|
|
test_postgres_datestyle,
|
|
test_postgres_intervalstyle,
|
|
test_postgres_parameter_inference,
|
|
test_postgres_uint64_parameter,
|
|
test_postgres_array_types,
|
|
test_mysql_prepare_stmt_insert_timestamp,
|
|
test_declare_fetch_close_cursor,
|
|
test_alter_update_on,
|
|
);
|
|
)*
|
|
};
|
|
}
|
|
|
|
pub async fn test_mysql_auth(store_type: StorageType) {
|
|
let user_provider = user_provider_from_option(
|
|
"static_user_provider:cmd:greptime_user=greptime_pwd,readonly_user:ro=readonly_pwd,writeonly_user:wo=writeonly_pwd",
|
|
)
|
|
.unwrap();
|
|
|
|
let (mut guard, fe_mysql_server) =
|
|
setup_mysql_server_with_user_provider(store_type, "sql_crud", Some(user_provider)).await;
|
|
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
|
|
|
// 1. no auth
|
|
let conn_re = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://{addr}/public"))
|
|
.await;
|
|
|
|
assert!(conn_re.is_err());
|
|
assert_eq!(
|
|
conn_re
|
|
.err()
|
|
.unwrap()
|
|
.into_database_error()
|
|
.unwrap()
|
|
.downcast::<MySqlDatabaseError>()
|
|
.code(),
|
|
Some("28000")
|
|
);
|
|
|
|
// 2. wrong pwd
|
|
let conn_re = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://greptime_user:wrong_pwd@{addr}/public"))
|
|
.await;
|
|
|
|
assert!(conn_re.is_err());
|
|
assert_eq!(
|
|
conn_re
|
|
.err()
|
|
.unwrap()
|
|
.into_database_error()
|
|
.unwrap()
|
|
.downcast::<MySqlDatabaseError>()
|
|
.code(),
|
|
Some("28000")
|
|
);
|
|
|
|
// 3. right pwd
|
|
let conn_re = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://greptime_user:greptime_pwd@{addr}/public"))
|
|
.await;
|
|
|
|
assert!(conn_re.is_ok());
|
|
|
|
// 4. readonly user
|
|
let conn_re = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://readonly_user:readonly_pwd@{addr}/public"))
|
|
.await;
|
|
assert!(conn_re.is_ok());
|
|
let pool = conn_re.unwrap();
|
|
let _ = pool.execute("SELECT 1").await.unwrap();
|
|
let err = pool
|
|
.execute("CREATE TABLE test (ts timestamp time index)")
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("(PermissionDenied): User is not authorized to perform this action"),
|
|
"{}",
|
|
err.to_string()
|
|
);
|
|
|
|
// 5. writeonly user
|
|
let conn_re = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!(
|
|
"mysql://writeonly_user:writeonly_pwd@{addr}/public"
|
|
))
|
|
.await;
|
|
assert!(conn_re.is_ok());
|
|
let pool = conn_re.unwrap();
|
|
let _ = pool
|
|
.execute("CREATE TABLE test (ts timestamp time index)")
|
|
.await
|
|
.unwrap();
|
|
let err = pool.execute("SHOW TABLES").await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("(PermissionDenied): User is not authorized to perform this action"),
|
|
"{}",
|
|
err.to_string()
|
|
);
|
|
|
|
let _ = fe_mysql_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_mysql_stmts(store_type: StorageType) {
|
|
common_telemetry::init_default_ut_logging();
|
|
|
|
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_stmts").await;
|
|
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
|
|
|
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
conn.execute("SET SESSION TRANSACTION READ ONLY")
|
|
.await
|
|
.unwrap();
|
|
|
|
conn.execute("SET TRANSACTION READ ONLY").await.unwrap();
|
|
|
|
// empty statements
|
|
// Only when "--" is followed by a whitespace is it considered a valid comment in MySQL,
|
|
// see https://dev.mysql.com/doc/refman/8.4/en/ansi-diff-comments.html
|
|
let err = conn.execute(" -- ----- ;").await.unwrap_err();
|
|
assert!(err.to_string().contains("empty statements"));
|
|
let err = conn.execute("-- --------\n;").await.unwrap_err();
|
|
assert!(err.to_string().contains("empty statements"));
|
|
let err = conn.execute(" ;").await.unwrap_err();
|
|
assert!(err.to_string().contains("empty statements"));
|
|
let err = conn.execute(" \n ;").await.unwrap_err();
|
|
assert!(err.to_string().contains("empty statements"));
|
|
|
|
let _ = fe_mysql_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_mysql_crud(store_type: StorageType) {
|
|
common_telemetry::init_default_ut_logging();
|
|
|
|
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_crud").await;
|
|
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
|
|
|
let pool = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
sqlx::query(
|
|
"create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt timestamp(3) default null, b blob default null, j json, v vector(3) default null)",
|
|
)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
for i in 0..10 {
|
|
let dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
|
|
chrono::DateTime::from_timestamp(60, i).unwrap().naive_utc(),
|
|
Utc,
|
|
);
|
|
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
|
let hello = format!("hello{i}");
|
|
let bytes = hello.as_bytes();
|
|
let json = serde_json::json!({
|
|
"code": i,
|
|
"success": true,
|
|
"payload": {
|
|
"features": [
|
|
"serde",
|
|
"json"
|
|
],
|
|
"homepage": null
|
|
}
|
|
});
|
|
let vector = "[1,2,3]";
|
|
sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?, ?)")
|
|
.bind(i)
|
|
.bind(i)
|
|
.bind(d)
|
|
.bind(dt)
|
|
.bind(bytes)
|
|
.bind(json)
|
|
.bind(vector)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let rows = sqlx::query("select i, d, dt, b, j, v from demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 10);
|
|
|
|
for (i, row) in rows.iter().enumerate() {
|
|
let ret: i64 = row.get("i");
|
|
let d: NaiveDate = row.get("d");
|
|
let dt: DateTime<Utc> = row.get("dt");
|
|
let bytes: Vec<u8> = row.get("b");
|
|
let json: serde_json::Value = row.get("j");
|
|
let vector: Vec<u8> = row.get("v");
|
|
assert_eq!(ret, i as i64);
|
|
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
|
assert_eq!(expected_d, d);
|
|
let expected_dt: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
|
|
chrono::DateTime::from_timestamp(60, i as u32)
|
|
.unwrap()
|
|
.naive_utc(),
|
|
Utc,
|
|
);
|
|
assert_eq!(
|
|
format!("{}", expected_dt.format("%Y-%m-%d %H:%M:%S")),
|
|
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
|
|
);
|
|
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
|
|
let expected_j = serde_json::json!({
|
|
"code": i,
|
|
"success": true,
|
|
"payload": {
|
|
"features": [
|
|
"serde",
|
|
"json"
|
|
],
|
|
"homepage": null
|
|
}
|
|
});
|
|
assert_eq!(json, expected_j);
|
|
assert_eq!(vector, "[1,2,3]".as_bytes());
|
|
}
|
|
|
|
let rows = sqlx::query("select i from demo where i=?")
|
|
.bind(6)
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 1);
|
|
|
|
for row in rows {
|
|
let ret: i64 = row.get("i");
|
|
assert_eq!(ret, 6);
|
|
}
|
|
|
|
// parameter type mismatch
|
|
let query_re = sqlx::query("select i from demo where i = ?")
|
|
.bind("test")
|
|
.fetch_all(&pool)
|
|
.await;
|
|
assert!(query_re.is_err());
|
|
let err = query_re.unwrap_err();
|
|
common_telemetry::info!("Error is {}", err);
|
|
assert_eq!(
|
|
err.into_database_error()
|
|
.unwrap()
|
|
.downcast::<MySqlDatabaseError>()
|
|
.number(),
|
|
1210,
|
|
);
|
|
|
|
let _ = sqlx::query("delete from demo")
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
let rows = sqlx::query("select i from demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 0);
|
|
|
|
// test prepare with default columns
|
|
sqlx::query("insert into demo(i) values(?)")
|
|
.bind(99)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
sqlx::query("insert into demo(i) values(?)")
|
|
.bind(-99)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
let rows = sqlx::query("select * from demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 2);
|
|
|
|
for row in rows {
|
|
let i: i64 = row.get("i");
|
|
let ts: DateTime<Utc> = row.get("ts");
|
|
let now = common_time::util::current_time_millis();
|
|
assert!(now - ts.timestamp_millis() < 1000);
|
|
assert_eq!(i.abs(), 99);
|
|
}
|
|
|
|
let _ = fe_mysql_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_mysql_timezone(store_type: StorageType) {
|
|
common_telemetry::init_default_ut_logging();
|
|
|
|
let (mut guard, fe_mysql_server) = setup_mysql_server(store_type, "test_mysql_timezone").await;
|
|
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
|
|
|
let mut conn = MySqlConnection::connect(&format!("mysql://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
let _ = conn
|
|
.execute("SET time_zone = 'Asia/Shanghai'")
|
|
.await
|
|
.unwrap();
|
|
|
|
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "Asia/Shanghai");
|
|
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "Asia/Shanghai");
|
|
let timezone = conn.fetch_all("SELECT @@system_time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
|
|
let _ = conn.execute("SET time_zone = 'UTC'").await.unwrap();
|
|
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
|
|
let timezone = conn.fetch_all("SELECT @@system_time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "UTC");
|
|
|
|
// test data
|
|
let _ = conn
|
|
.execute("create table demo(i bigint, ts timestamp time index)")
|
|
.await
|
|
.unwrap();
|
|
let _ = conn
|
|
.execute("insert into demo values(1, 1667446797450)")
|
|
.await
|
|
.unwrap();
|
|
let rows = conn.fetch_all("select ts from demo").await.unwrap();
|
|
assert_eq!(
|
|
rows[0]
|
|
.get::<chrono::DateTime<Utc>, usize>(0)
|
|
.to_rfc3339_opts(SecondsFormat::Millis, true),
|
|
"2022-11-03T03:39:57.450Z"
|
|
);
|
|
|
|
let _ = conn.execute("SET time_zone = '+08:00'").await.unwrap();
|
|
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "+08:00");
|
|
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "+08:00");
|
|
|
|
let rows2 = conn.fetch_all("select ts from demo").await.unwrap();
|
|
// we use Utc here for format only
|
|
assert_eq!(
|
|
rows2[0]
|
|
.get::<chrono::DateTime<Utc>, usize>(0)
|
|
.to_rfc3339_opts(SecondsFormat::Millis, true),
|
|
"2022-11-03T11:39:57.450Z"
|
|
);
|
|
|
|
let _ = conn
|
|
.execute("SET @@SESSION.TIME_ZONE = '-7:00'")
|
|
.await
|
|
.unwrap();
|
|
let rows2 = conn.fetch_all("select ts from demo").await.unwrap();
|
|
// we use Utc here for format only
|
|
assert_eq!(
|
|
rows2[0]
|
|
.get::<chrono::DateTime<Utc>, usize>(0)
|
|
.to_rfc3339_opts(SecondsFormat::Millis, true),
|
|
"2022-11-02T20:39:57.450Z"
|
|
);
|
|
let timezone = conn.fetch_all("SELECT @@time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "-07:00");
|
|
let timezone = conn.fetch_all("SELECT @@session.time_zone").await.unwrap();
|
|
assert_eq!(timezone[0].get::<String, usize>(0), "-07:00");
|
|
|
|
let _ = fe_mysql_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_auth(store_type: StorageType) {
|
|
let user_provider =
|
|
user_provider_from_option("static_user_provider:cmd:greptime_user=greptime_pwd").unwrap();
|
|
|
|
let (mut guard, fe_pg_server) =
|
|
setup_pg_server_with_user_provider(store_type, "sql_crud", Some(user_provider)).await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
// 1. no auth
|
|
let conn_re = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("postgres://{addr}/public"))
|
|
.await;
|
|
|
|
assert!(conn_re.is_err());
|
|
assert_eq!(
|
|
conn_re
|
|
.err()
|
|
.unwrap()
|
|
.into_database_error()
|
|
.unwrap()
|
|
.downcast::<PgDatabaseError>()
|
|
.code(),
|
|
"28P01"
|
|
);
|
|
|
|
// 2. wrong pwd
|
|
let conn_re = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("postgres://greptime_user:wrong_pwd@{addr}/public"))
|
|
.await;
|
|
|
|
assert!(conn_re.is_err());
|
|
assert_eq!(
|
|
conn_re
|
|
.err()
|
|
.unwrap()
|
|
.into_database_error()
|
|
.unwrap()
|
|
.downcast::<PgDatabaseError>()
|
|
.code(),
|
|
"28P01"
|
|
);
|
|
|
|
// 2. right pwd
|
|
let conn_re = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!(
|
|
"postgres://greptime_user:greptime_pwd@{addr}/public"
|
|
))
|
|
.await;
|
|
|
|
assert!(conn_re.is_ok());
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_alter_update_on(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_crud").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("postgres://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
sqlx::query(
|
|
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)",
|
|
)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let row_before_alter = sqlx::query(
|
|
"SELECT *
|
|
FROM information_schema.tables WHERE table_name = $1;",
|
|
)
|
|
.bind("demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(row_before_alter.len(), 1);
|
|
let before_row = &row_before_alter[0];
|
|
|
|
let created_on: NaiveDateTime = before_row.get("create_time");
|
|
let updated_on_before: NaiveDateTime = before_row.get("update_time");
|
|
assert_eq!(created_on, updated_on_before);
|
|
|
|
std::thread::sleep(std::time::Duration::from_millis(1100));
|
|
|
|
sqlx::query("alter table demo add column j json;")
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let row_after_alter = sqlx::query(
|
|
"SELECT *
|
|
FROM information_schema.tables WHERE table_name = $1;",
|
|
)
|
|
.bind("demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(row_after_alter.len(), 1);
|
|
let after_row = &row_after_alter[0];
|
|
|
|
let updated_on_after: NaiveDateTime = after_row.get("update_time");
|
|
assert_ne!(updated_on_before, updated_on_after);
|
|
|
|
let _ = sqlx::query("delete from demo")
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_crud(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_crud").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("postgres://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
sqlx::query(
|
|
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob, j json)",
|
|
)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
for i in 0..10 {
|
|
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
|
let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_micros();
|
|
let bytes = "hello".as_bytes();
|
|
let json = serde_json::json!({
|
|
"code": i,
|
|
"success": true,
|
|
"payload": {
|
|
"features": [
|
|
"serde",
|
|
"json"
|
|
],
|
|
"homepage": null
|
|
}
|
|
});
|
|
|
|
sqlx::query("insert into demo values($1, $2, $3, $4, $5, $6)")
|
|
.bind(i)
|
|
.bind(i)
|
|
.bind(d)
|
|
.bind(dt)
|
|
.bind(bytes)
|
|
.bind(json)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let rows = sqlx::query("select i,d,dt,b,j from demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 10);
|
|
|
|
for (i, row) in rows.iter().enumerate() {
|
|
let ret: i64 = row.get("i");
|
|
let d: NaiveDate = row.get("d");
|
|
let dt: NaiveDateTime = row.get("dt");
|
|
let bytes: Vec<u8> = row.get("b");
|
|
let json: serde_json::Value = row.get("j");
|
|
|
|
assert_eq!(ret, i as i64);
|
|
|
|
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
|
assert_eq!(expected_d, d);
|
|
|
|
let expected_dt = NaiveDate::from_yo_opt(2015, 100)
|
|
.and_then(|d| d.and_hms_opt(0, 0, 0))
|
|
.unwrap();
|
|
assert_eq!(expected_dt, dt);
|
|
assert_eq!("hello".as_bytes(), bytes);
|
|
|
|
let expected_j = serde_json::json!({
|
|
"code": i,
|
|
"success": true,
|
|
"payload": {
|
|
"features": [
|
|
"serde",
|
|
"json"
|
|
],
|
|
"homepage": null
|
|
}
|
|
});
|
|
assert_eq!(json.to_string(), expected_j.to_string());
|
|
}
|
|
|
|
let rows = sqlx::query("select i from demo where i=$1")
|
|
.bind(6)
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 1);
|
|
|
|
for row in rows {
|
|
let ret: i64 = row.get("i");
|
|
assert_eq!(ret, 6);
|
|
}
|
|
|
|
let _ = sqlx::query("delete from demo")
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
let rows = sqlx::query("select i from demo")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 0);
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_mysql_slow_query(store_type: StorageType) {
|
|
common_telemetry::init_default_ut_logging();
|
|
|
|
let (mut guard, fe_mysql_server) = setup_mysql_server_with_slow_query_threshold(
|
|
store_type,
|
|
"test_mysql_slow_query",
|
|
Duration::from_millis(100),
|
|
)
|
|
.await;
|
|
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
|
|
|
let pool = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
// The slow query should run longer than the configured threshold.
|
|
let slow_query = "SELECT count(*) FROM generate_series(1, 50000000)";
|
|
|
|
// Simulate a slow query.
|
|
sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
|
|
|
|
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
|
|
let query = format!(
|
|
"SELECT {}, {}, {}, {} FROM {table} WHERE {} = ?",
|
|
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
|
);
|
|
|
|
let row = tokio::time::timeout(Duration::from_secs(10), async {
|
|
loop {
|
|
if let Ok(Some(row)) = sqlx::query(&query)
|
|
.bind(slow_query)
|
|
.fetch_optional(&pool)
|
|
.await
|
|
{
|
|
break row;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// Check the results.
|
|
let cost: u64 = row.get(0);
|
|
let threshold: u64 = row.get(1);
|
|
let query: String = row.get(2);
|
|
let is_promql: bool = row.get(3);
|
|
|
|
assert!(cost > 0 && threshold > 0 && cost > threshold);
|
|
assert_eq!(query, slow_query);
|
|
assert!(!is_promql);
|
|
|
|
let _ = fe_mysql_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_bytea(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
let _ = client
|
|
.simple_query("CREATE TABLE test(b BLOB, ts TIMESTAMP TIME INDEX)")
|
|
.await
|
|
.unwrap();
|
|
let _ = client
|
|
.simple_query("INSERT INTO test VALUES(X'6162636b6c6d2aa954', 0)")
|
|
.await
|
|
.unwrap();
|
|
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
|
|
match &mess[1] {
|
|
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
|
|
_ => unreachable!(),
|
|
}
|
|
};
|
|
|
|
let r = client.simple_query("SELECT b FROM test").await.unwrap();
|
|
let b = get_row(r);
|
|
assert_eq!(b, "\\x6162636b6c6d2aa954");
|
|
|
|
let _ = client.simple_query("SET bytea_output='hex'").await.unwrap();
|
|
let r = client.simple_query("SELECT b FROM test").await.unwrap();
|
|
let b = get_row(r);
|
|
assert_eq!(b, "\\x6162636b6c6d2aa954");
|
|
|
|
let _ = client
|
|
.simple_query("SET bytea_output='escape'")
|
|
.await
|
|
.unwrap();
|
|
let r = client.simple_query("SELECT b FROM test").await.unwrap();
|
|
let b = get_row(r);
|
|
assert_eq!(b, "abcklm*\\251T");
|
|
|
|
let _e = client
|
|
.simple_query("SET bytea_output='invalid'")
|
|
.await
|
|
.unwrap_err();
|
|
|
|
// binary format shall not be affected by bytea_output
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("postgres://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
let row = sqlx::query("select b from test")
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
let val: Vec<u8> = row.get("b");
|
|
assert_eq!(val, [97, 98, 99, 107, 108, 109, 42, 169, 84]);
|
|
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_slow_query(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server_with_slow_query_threshold(
|
|
store_type,
|
|
"test_postgres_slow_query",
|
|
Duration::from_millis(100),
|
|
)
|
|
.await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("postgres://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
let slow_query = "SELECT count(*) FROM generate_series(1, 50000000)";
|
|
let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
|
|
|
|
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
|
|
let query = format!(
|
|
"SELECT {}, {}, {}, {} FROM {table} WHERE {} = $1",
|
|
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
|
|
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
|
);
|
|
|
|
let row = tokio::time::timeout(Duration::from_secs(10), async {
|
|
loop {
|
|
if let Ok(Some(row)) = sqlx::query(&query)
|
|
.bind(slow_query)
|
|
.fetch_optional(&pool)
|
|
.await
|
|
{
|
|
break row;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let cost: Decimal = row.get(0);
|
|
let threshold: Decimal = row.get(1);
|
|
let query: String = row.get(2);
|
|
let is_promql: bool = row.get(3);
|
|
|
|
assert!(cost > 0.into() && threshold > 0.into() && cost > threshold);
|
|
assert_eq!(query, slow_query);
|
|
assert!(!is_promql);
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_datestyle(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
let validate_datestyle = |client: Client, datestyle: &str, is_valid: bool| {
|
|
let datestyle = datestyle.to_string();
|
|
async move {
|
|
assert_eq!(
|
|
client
|
|
.simple_query(format!("SET DATESTYLE={}", datestyle).as_str())
|
|
.await
|
|
.is_ok(),
|
|
is_valid
|
|
);
|
|
client
|
|
}
|
|
};
|
|
|
|
// style followed by order is valid
|
|
let client = validate_datestyle(client, "'ISO,MDY'", true).await;
|
|
|
|
// Mix of string and ident is valid
|
|
let client = validate_datestyle(client, "'ISO',MDY", true).await;
|
|
|
|
// list of string that didn't corrupt is valid
|
|
let client = validate_datestyle(client, "'ISO,MDY','ISO,MDY'", true).await;
|
|
|
|
// corrupted style
|
|
let client = validate_datestyle(client, "'ISO,German'", false).await;
|
|
|
|
// corrupted order
|
|
let client = validate_datestyle(client, "'ISO,DMY','ISO,MDY'", false).await;
|
|
|
|
// as long as the value is not corrupted, it's valid
|
|
let client = validate_datestyle(client, "ISO,ISO,ISO,ISO,ISO,MDY,MDY,MDY,MDY", true).await;
|
|
|
|
let _ = client
|
|
.simple_query("CREATE TABLE ts_test(ts TIMESTAMP TIME INDEX)")
|
|
.await
|
|
.expect("CREATE TABLE ts_test ERROR");
|
|
let _ = client
|
|
.simple_query("CREATE TABLE date_test(d date, ts TIMESTAMP TIME INDEX)")
|
|
.await
|
|
.expect("CREATE TABLE date_test ERROR");
|
|
|
|
let _ = client
|
|
.simple_query("CREATE TABLE dt_test(dt datetime, ts TIMESTAMP TIME INDEX)")
|
|
.await
|
|
.expect("CREATE TABLE dt_test ERROR");
|
|
|
|
let _ = client
|
|
.simple_query("INSERT INTO ts_test VALUES('1997-12-17 07:37:16.123')")
|
|
.await
|
|
.expect("INSERT INTO ts_test ERROR");
|
|
|
|
let _ = client
|
|
.simple_query("INSERT INTO date_test VALUES('1997-12-17', '1997-12-17 07:37:16.123')")
|
|
.await
|
|
.expect("INSERT INTO date_test ERROR");
|
|
|
|
let _ = client
|
|
.simple_query(
|
|
"INSERT INTO dt_test VALUES('1997-12-17 07:37:16.123', '1997-12-17 07:37:16.123')",
|
|
)
|
|
.await
|
|
.expect("INSERT INTO dt_test ERROR");
|
|
|
|
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
|
|
match &mess[1] {
|
|
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
|
|
_ => unreachable!("Unexpected messages: {:?}", mess),
|
|
}
|
|
};
|
|
|
|
let date = "DATE";
|
|
let datetime = "TIMESTAMP";
|
|
let timestamp = "TIMESTAMP";
|
|
|
|
let iso = "ISO";
|
|
let sql = "SQL";
|
|
let postgres = "Postgres";
|
|
let german = "German";
|
|
|
|
let expected_set: HashMap<&str, HashMap<&str, HashMap<&str, &str>>> = HashMap::from([
|
|
(
|
|
date,
|
|
HashMap::from([
|
|
(
|
|
iso,
|
|
HashMap::from([
|
|
("MDY", "1997-12-17"),
|
|
("DMY", "1997-12-17"),
|
|
("YMD", "1997-12-17"),
|
|
]),
|
|
),
|
|
(
|
|
sql,
|
|
HashMap::from([
|
|
("MDY", "12/17/1997"),
|
|
("DMY", "17/12/1997"),
|
|
("YMD", "12/17/1997"),
|
|
]),
|
|
),
|
|
(
|
|
postgres,
|
|
HashMap::from([
|
|
("MDY", "12-17-1997"),
|
|
("DMY", "17-12-1997"),
|
|
("YMD", "12-17-1997"),
|
|
]),
|
|
),
|
|
(
|
|
german,
|
|
HashMap::from([
|
|
("MDY", "17.12.1997"),
|
|
("DMY", "17.12.1997"),
|
|
("YMD", "17.12.1997"),
|
|
]),
|
|
),
|
|
]),
|
|
),
|
|
(
|
|
timestamp,
|
|
HashMap::from([
|
|
(
|
|
iso,
|
|
HashMap::from([
|
|
("MDY", "1997-12-17 07:37:16.123000"),
|
|
("DMY", "1997-12-17 07:37:16.123000"),
|
|
("YMD", "1997-12-17 07:37:16.123000"),
|
|
]),
|
|
),
|
|
(
|
|
sql,
|
|
HashMap::from([
|
|
("MDY", "12/17/1997 07:37:16.123000"),
|
|
("DMY", "17/12/1997 07:37:16.123000"),
|
|
("YMD", "12/17/1997 07:37:16.123000"),
|
|
]),
|
|
),
|
|
(
|
|
postgres,
|
|
HashMap::from([
|
|
("MDY", "Wed Dec 17 07:37:16.123000 1997"),
|
|
("DMY", "Wed 17 Dec 07:37:16.123000 1997"),
|
|
("YMD", "Wed Dec 17 07:37:16.123000 1997"),
|
|
]),
|
|
),
|
|
(
|
|
german,
|
|
HashMap::from([
|
|
("MDY", "17.12.1997 07:37:16.123000"),
|
|
("DMY", "17.12.1997 07:37:16.123000"),
|
|
("YMD", "17.12.1997 07:37:16.123000"),
|
|
]),
|
|
),
|
|
]),
|
|
),
|
|
]);
|
|
|
|
let get_expected = |ty: &str, style: &str, order: &str| {
|
|
expected_set
|
|
.get(ty)
|
|
.and_then(|m| m.get(style))
|
|
.and_then(|m2| m2.get(order))
|
|
.unwrap()
|
|
.to_string()
|
|
};
|
|
|
|
for style in ["ISO", "SQL", "Postgres", "German"] {
|
|
for order in ["MDY", "DMY", "YMD"] {
|
|
let _ = client
|
|
.simple_query(&format!("SET DATESTYLE='{}', '{}'", style, order))
|
|
.await
|
|
.expect("SET DATESTYLE ERROR");
|
|
|
|
let r = client.simple_query("SELECT ts FROM ts_test").await.unwrap();
|
|
let ts = get_row(r);
|
|
assert_eq!(
|
|
ts,
|
|
get_expected(timestamp, style, order),
|
|
"style: {}, order: {}",
|
|
style,
|
|
order
|
|
);
|
|
|
|
let r = client
|
|
.simple_query("SELECT d FROM date_test")
|
|
.await
|
|
.unwrap();
|
|
let d = get_row(r);
|
|
assert_eq!(
|
|
d,
|
|
get_expected(date, style, order),
|
|
"style: {}, order: {}",
|
|
style,
|
|
order
|
|
);
|
|
|
|
let r = client.simple_query("SELECT dt FROM dt_test").await.unwrap();
|
|
let dt = get_row(r);
|
|
assert_eq!(
|
|
dt,
|
|
get_expected(datetime, style, order),
|
|
"style: {}, order: {}",
|
|
style,
|
|
order
|
|
);
|
|
}
|
|
}
|
|
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_intervalstyle(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) =
|
|
setup_pg_server(store_type, "test_postgres_intervalstyle").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
let validate_intervalstyle = |client: Client, intervalstyle: &str, is_valid: bool| {
|
|
let intervalstyle = intervalstyle.to_string();
|
|
async move {
|
|
assert_eq!(
|
|
client
|
|
.simple_query(format!("SET INTERVALSTYLE='{}'", intervalstyle).as_str())
|
|
.await
|
|
.is_ok(),
|
|
is_valid,
|
|
"testing intervalstyle {intervalstyle}"
|
|
);
|
|
client
|
|
}
|
|
};
|
|
|
|
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
|
|
match &mess[1] {
|
|
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
|
|
_ => unreachable!(),
|
|
}
|
|
};
|
|
|
|
let client = validate_intervalstyle(client, "iso_8601", true).await;
|
|
let client = validate_intervalstyle(client, "sql_standard", true).await;
|
|
let client = validate_intervalstyle(client, "postgres", true).await;
|
|
let client = validate_intervalstyle(client, "postgres_verbose", true).await;
|
|
let client = validate_intervalstyle(client, "invalid_style", false).await;
|
|
|
|
let expected_formats: HashMap<&str, &str> = HashMap::from([
|
|
("iso_8601", "P1DT2H3M"),
|
|
("sql_standard", "1 2:03:00"),
|
|
("postgres", "1 day 02:03:00"),
|
|
("postgres_verbose", "@ 1 day 2 hours 3 mins"),
|
|
]);
|
|
|
|
for (style, expected_format) in expected_formats {
|
|
let _ = client
|
|
.simple_query(&format!("SET INTERVALSTYLE='{}'", style))
|
|
.await
|
|
.expect("SET INTERVALSTYLE ERROR");
|
|
|
|
let interval = get_row(
|
|
client
|
|
.simple_query("SHOW VARIABLES intervalstyle")
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
assert_eq!(interval, style);
|
|
|
|
let result = get_row(
|
|
client
|
|
.simple_query("SELECT INTERVAL '1 day 2 hours 3 minutes'")
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
assert_eq!(
|
|
result, expected_format,
|
|
"intervalstyle {}: expected '{}', got '{}'",
|
|
style, expected_format, result
|
|
);
|
|
}
|
|
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_timezone(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_timezone").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
let get_row = |mess: Vec<SimpleQueryMessage>| -> String {
|
|
match &mess[1] {
|
|
SimpleQueryMessage::Row(row) => row.get(0).unwrap().to_string(),
|
|
_ => unreachable!(),
|
|
}
|
|
};
|
|
|
|
let _ = client.simple_query("SET time_zone = 'UTC'").await.unwrap();
|
|
let timezone = get_row(
|
|
client
|
|
.simple_query("SHOW VARIABLES time_zone")
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
assert_eq!(timezone, "UTC");
|
|
let timezone = get_row(
|
|
client
|
|
.simple_query("SHOW VARIABLES system_time_zone")
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
assert_eq!(timezone, "UTC");
|
|
let _ = client
|
|
.simple_query("SET time_zone = 'Asia/Shanghai'")
|
|
.await
|
|
.unwrap();
|
|
let timezone = get_row(
|
|
client
|
|
.simple_query("SHOW VARIABLES time_zone")
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
assert_eq!(timezone, "Asia/Shanghai");
|
|
let timezone = get_row(
|
|
client
|
|
.simple_query("SHOW VARIABLES system_time_zone")
|
|
.await
|
|
.unwrap(),
|
|
);
|
|
assert_eq!(timezone, "UTC");
|
|
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_parameter_inference(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) =
|
|
setup_pg_server(store_type, "test_postgres_parameter_inference").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
// Create demo table
|
|
let _ = client
|
|
.simple_query("create table demo(i bigint, ts timestamp time index, d date, dt datetime)")
|
|
.await
|
|
.unwrap();
|
|
|
|
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
|
|
let dt = d.and_hms_opt(0, 0, 0).unwrap();
|
|
let _ = client
|
|
.execute(
|
|
"INSERT INTO demo VALUES($1, $2, $3, $4)",
|
|
&[&0i64, &dt, &d, &dt],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let rows = client
|
|
.query("SELECT * FROM demo WHERE i = $1", &[&0i64])
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(1, rows.len());
|
|
|
|
// Shutdown the client.
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_uint64_parameter(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) =
|
|
setup_pg_server(store_type, "test_postgres_uint64_parameter").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
let _ = client
|
|
.simple_query("create table demo_u64(v bigint unsigned, ts timestamp time index)")
|
|
.await
|
|
.unwrap();
|
|
|
|
let dt = NaiveDate::from_yo_opt(2015, 100)
|
|
.unwrap()
|
|
.and_hms_opt(0, 0, 0)
|
|
.unwrap();
|
|
let _ = client
|
|
.execute(
|
|
"INSERT INTO demo_u64 VALUES($1, $2)",
|
|
&[&Decimal::from(123456u64), &dt],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let rows = client
|
|
.query(
|
|
"SELECT count(*) FROM demo_u64 WHERE v = $1",
|
|
&[&Decimal::from(123456u64)],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(1, rows.len());
|
|
let count: i64 = rows[0].get(0);
|
|
assert_eq!(count, 1);
|
|
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_mysql_async_timestamp(store_type: StorageType) {
|
|
use mysql_async::prelude::*;
|
|
use time::PrimitiveDateTime;
|
|
|
|
#[derive(Debug)]
|
|
struct CpuMetric {
|
|
hostname: String,
|
|
environment: String,
|
|
usage_user: f64,
|
|
usage_system: f64,
|
|
usage_idle: f64,
|
|
ts: i64,
|
|
}
|
|
|
|
impl CpuMetric {
|
|
fn new(
|
|
hostname: String,
|
|
environment: String,
|
|
usage_user: f64,
|
|
usage_system: f64,
|
|
usage_idle: f64,
|
|
ts: i64,
|
|
) -> Self {
|
|
Self {
|
|
hostname,
|
|
environment,
|
|
usage_user,
|
|
usage_system,
|
|
usage_idle,
|
|
ts,
|
|
}
|
|
}
|
|
}
|
|
common_telemetry::init_default_ut_logging();
|
|
|
|
let (mut guard, fe_mysql_server) =
|
|
setup_mysql_server(store_type, "test_mysql_async_timestamp").await;
|
|
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
|
|
|
let url = format!("mysql://{addr}/public");
|
|
let opts = mysql_async::Opts::from_url(&url).unwrap();
|
|
let mut conn = mysql_async::Conn::new(opts)
|
|
.await
|
|
.expect("create connection failure");
|
|
|
|
r"CREATE TABLE IF NOT EXISTS cpu_metrics (
|
|
hostname STRING,
|
|
environment STRING,
|
|
usage_user DOUBLE,
|
|
usage_system DOUBLE,
|
|
usage_idle DOUBLE,
|
|
ts TIMESTAMP,
|
|
TIME INDEX(ts),
|
|
PRIMARY KEY(hostname, environment)
|
|
);"
|
|
.ignore(&mut conn)
|
|
.await
|
|
.expect("create table failure");
|
|
|
|
let metrics = [
|
|
CpuMetric::new(
|
|
"host0".into(),
|
|
"test".into(),
|
|
32f64,
|
|
3f64,
|
|
4f64,
|
|
1680307200050,
|
|
),
|
|
CpuMetric::new(
|
|
"host1".into(),
|
|
"test".into(),
|
|
29f64,
|
|
32f64,
|
|
50f64,
|
|
1680307200050,
|
|
),
|
|
CpuMetric::new(
|
|
"host0".into(),
|
|
"test".into(),
|
|
32f64,
|
|
3f64,
|
|
4f64,
|
|
1680307260050,
|
|
),
|
|
CpuMetric::new(
|
|
"host1".into(),
|
|
"test".into(),
|
|
29f64,
|
|
32f64,
|
|
50f64,
|
|
1680307260050,
|
|
),
|
|
CpuMetric::new(
|
|
"host0".into(),
|
|
"test".into(),
|
|
32f64,
|
|
3f64,
|
|
4f64,
|
|
1680307320050,
|
|
),
|
|
CpuMetric::new(
|
|
"host1".into(),
|
|
"test".into(),
|
|
29f64,
|
|
32f64,
|
|
50f64,
|
|
1680307320050,
|
|
),
|
|
];
|
|
|
|
r"INSERT INTO cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
|
|
VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
|
|
.with(metrics.iter().map(|metric| {
|
|
params! {
|
|
"hostname" => &metric.hostname,
|
|
"environment" => &metric.environment,
|
|
"usage_user" => metric.usage_user,
|
|
"usage_system" => metric.usage_system,
|
|
"usage_idle" => metric.usage_idle,
|
|
"ts" => metric.ts,
|
|
}
|
|
}))
|
|
.batch(&mut conn)
|
|
.await
|
|
.expect("insert data failure");
|
|
|
|
// query data
|
|
let loaded_metrics = "SELECT * FROM cpu_metrics"
|
|
.with(())
|
|
.map(
|
|
&mut conn,
|
|
|(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
|
|
String,
|
|
String,
|
|
f64,
|
|
f64,
|
|
f64,
|
|
PrimitiveDateTime,
|
|
)| {
|
|
let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
|
|
CpuMetric::new(
|
|
hostname,
|
|
environment,
|
|
usage_user,
|
|
usage_system,
|
|
usage_idle,
|
|
ts,
|
|
)
|
|
},
|
|
)
|
|
.await
|
|
.expect("query data failure");
|
|
assert_eq!(loaded_metrics.len(), 6);
|
|
|
|
let _ = fe_mysql_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) {
|
|
let (mut guard, server) =
|
|
setup_mysql_server(store_type, "test_mysql_prepare_stmt_insert_timestamp").await;
|
|
let addr = server.bind_addr().unwrap().to_string();
|
|
|
|
let pool = MySqlPoolOptions::new()
|
|
.max_connections(2)
|
|
.connect(&format!("mysql://{addr}/public"))
|
|
.await
|
|
.unwrap();
|
|
|
|
sqlx::query("create table demo(i bigint, ts timestamp time index)")
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Valid timestamp binary encoding: https://mariadb.com/kb/en/resultset-row/#timestamp-binary-encoding
|
|
|
|
// Timestamp data length = 4, year-month-day(ymd) only:
|
|
sqlx::query("insert into demo values(?, ?)")
|
|
.bind(0)
|
|
.bind(
|
|
NaiveDate::from_ymd_opt(2023, 12, 19)
|
|
// Though hour, minute and second are provided, `sqlx` will not encode them if they are all zeroes,
|
|
// which is just what we desire here.
|
|
// See https://github.com/launchbadge/sqlx/blob/bb064e3789d68ad4e9affe7cba34944abb000f72/sqlx-core/src/mysql/types/chrono.rs#L186C22-L186C22
|
|
.and_then(|x| x.and_hms_opt(0, 0, 0))
|
|
.unwrap(),
|
|
)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Timestamp data length = 7, ymd and hour-minute-second(hms):
|
|
sqlx::query("insert into demo values(?, ?)")
|
|
.bind(1)
|
|
.bind(
|
|
NaiveDate::from_ymd_opt(2023, 12, 19)
|
|
.and_then(|x| x.and_hms_opt(13, 19, 1))
|
|
.unwrap(),
|
|
)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Timestamp data length = 11, ymd, hms and microseconds:
|
|
sqlx::query("insert into demo values(?, ?)")
|
|
.bind(2)
|
|
.bind(
|
|
NaiveDate::from_ymd_opt(2023, 12, 19)
|
|
.and_then(|x| x.and_hms_micro_opt(13, 20, 1, 123456))
|
|
.unwrap(),
|
|
)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let rows = sqlx::query("select i, ts from demo order by i")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(rows.len(), 3);
|
|
|
|
let x: DateTime<Utc> = rows[0].get("ts");
|
|
assert_eq!(x.to_string(), "2023-12-19 00:00:00 UTC");
|
|
|
|
let x: DateTime<Utc> = rows[1].get("ts");
|
|
assert_eq!(x.to_string(), "2023-12-19 13:19:01 UTC");
|
|
|
|
let x: DateTime<Utc> = rows[2].get("ts");
|
|
assert_eq!(x.to_string(), "2023-12-19 13:20:01.123 UTC");
|
|
|
|
let _ = server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_postgres_array_types(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_array_types").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
let rows = client
|
|
.query(
|
|
"SELECT arrow_cast(1, 'List(Int8)'), arrow_cast('tom', 'List(Utf8)'), arrow_cast(3.14, 'List(Float32)'), arrow_cast('2023-01-02T12:53:02', 'List(Timestamp(Millisecond, None))')",
|
|
&[],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(1, rows.len());
|
|
|
|
// Shutdown the client.
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|
|
|
|
pub async fn test_declare_fetch_close_cursor(store_type: StorageType) {
|
|
let (mut guard, fe_pg_server) =
|
|
setup_pg_server(store_type, "test_declare_fetch_close_cursor").await;
|
|
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
|
|
|
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
tokio::spawn(async move {
|
|
connection.await.unwrap();
|
|
tx.send(()).unwrap();
|
|
});
|
|
|
|
client
|
|
.execute(
|
|
"DECLARE c1 CURSOR FOR SELECT * FROM numbers WHERE number > 2 LIMIT 50::bigint",
|
|
&[],
|
|
)
|
|
.await
|
|
.expect("declare cursor");
|
|
|
|
// duplicated cursor
|
|
assert!(
|
|
client
|
|
.execute("DECLARE c1 CURSOR FOR SELECT 1", &[],)
|
|
.await
|
|
.is_err()
|
|
);
|
|
|
|
let rows = client.query("FETCH 5 FROM c1", &[]).await.unwrap();
|
|
assert_eq!(5, rows.len());
|
|
|
|
let rows = client.query("FETCH 100 FROM c1", &[]).await.unwrap();
|
|
assert_eq!(45, rows.len());
|
|
|
|
let rows = client.query("FETCH 100 FROM c1", &[]).await.unwrap();
|
|
assert_eq!(0, rows.len());
|
|
|
|
client.execute("CLOSE c1", &[]).await.expect("close cursor");
|
|
|
|
// cursor not found
|
|
let result = client.query("FETCH 100 FROM c1", &[]).await;
|
|
assert!(result.is_err());
|
|
|
|
client
|
|
.execute(
|
|
"DECLARE c2 CURSOR FOR SELECT * FROM numbers WHERE number < 0",
|
|
&[],
|
|
)
|
|
.await
|
|
.expect("declare cursor");
|
|
|
|
let rows = client.query("FETCH 5 FROM c2", &[]).await.unwrap();
|
|
assert_eq!(0, rows.len());
|
|
|
|
client.execute("CLOSE c2", &[]).await.expect("close cursor");
|
|
|
|
// Shutdown the client.
|
|
drop(client);
|
|
rx.await.unwrap();
|
|
|
|
let _ = fe_pg_server.shutdown().await;
|
|
guard.remove_all().await;
|
|
}
|