feat: adopt REPLACE interceptor and quit all processes on exit (#1478)

* bump version and update test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* quit all processes on drop

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update tests/runner/src/env.rs

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Ruihang Xia
2023-04-27 15:16:41 +08:00
committed by GitHub
parent bf35620904
commit 939a51aea9
9 changed files with 63 additions and 108 deletions

79
Cargo.lock generated
View File

@@ -696,7 +696,7 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bb524613be645939e280b7279f7b017f98cf7f5ef084ec374df373530e73277"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"syn 2.0.15",
@@ -1409,7 +1409,7 @@ version = "3.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
@@ -1422,7 +1422,7 @@ version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9644cd56d6b87dbe899ef8b053e331c0637664e9e21a33dfcdc36093f5c5c4"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"syn 2.0.15",
@@ -1865,7 +1865,7 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60"
dependencies = [
"encode_unicode 0.3.6",
"encode_unicode",
"lazy_static",
"libc",
"unicode-width",
@@ -2745,12 +2745,6 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "encode_unicode"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "encoding_rs"
version = "0.8.32"
@@ -3898,15 +3892,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -5030,7 +5015,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1f30203977ce6134381bd895ba82892f967578442a0894484858594de992"
dependencies = [
"darling",
"heck 0.4.1",
"heck",
"num-bigint",
"proc-macro-crate 1.3.1",
"proc-macro-error",
@@ -6097,8 +6082,6 @@ checksum = "8ff1fec61082821f8236cf6c0c14e8172b62ce8a72a0eedc30d3b247bb68dc11"
dependencies = [
"ansi_term",
"pad",
"prettytable-rs",
"structopt",
]
[[package]]
@@ -6121,20 +6104,6 @@ dependencies = [
"syn 2.0.15",
]
[[package]]
name = "prettytable-rs"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a"
dependencies = [
"csv",
"encode_unicode 1.0.0",
"is-terminal",
"lazy_static",
"term",
"unicode-width",
]
[[package]]
name = "priority-queue"
version = "1.3.1"
@@ -6288,7 +6257,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck 0.4.1",
"heck",
"itertools",
"lazy_static",
"log",
@@ -8127,7 +8096,7 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"syn 1.0.109",
@@ -8220,14 +8189,12 @@ dependencies = [
[[package]]
name = "sqlness"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d4907079f624d8c150ef65e14e833bf7f6afb0f4943b2c35d5de9ff44b4423"
source = "git+https://github.com/CeresDB/sqlness.git?rev=dde4b19d7e4a41319d05a0c5bfae5c4422fde14f#dde4b19d7e4a41319d05a0c5bfae5c4422fde14f"
dependencies = [
"async-trait",
"derive_builder 0.11.2",
"prettydiff",
"regex",
"serde",
"thiserror",
"toml",
"walkdir",
@@ -8436,30 +8403,6 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "structopt"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10"
dependencies = [
"clap 2.34.0",
"lazy_static",
"structopt-derive",
]
[[package]]
name = "structopt-derive"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [
"heck 0.3.3",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "strum"
version = "0.24.1"
@@ -8475,7 +8418,7 @@ version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck 0.4.1",
"heck",
"proc-macro2",
"quote",
"rustversion",
@@ -8524,7 +8467,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ae64fb7ad0670c7d6d53d57b1b91beb2212afc30e164cc8edb02d6b2cff32a"
dependencies = [
"gix",
"heck 0.4.1",
"heck",
"prettyplease 0.2.4",
"prost",
"prost-build",
@@ -9466,7 +9409,7 @@ version = "0.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a"
dependencies = [
"heck 0.4.1",
"heck",
"log",
"proc-macro2",
"quote",

View File

@@ -18,13 +18,12 @@ use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, Location, OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::{load_balance as lb, Id};
use crate::error;
use crate::error::Error::TonicStatus;
use crate::error::Result;
#[derive(Clone, Debug)]
@@ -123,15 +122,8 @@ impl Inner {
async fn delete(&self, mut req: DeleteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client.delete(req).await.map_err(|mut source| {
// FIXME(hl): here intentionally clear the metadata field so that error date does not changes which will break sqlness test.
// we can remove this hack as soon as either: sqlness supports regex result match or greptimedb supports renaming table routes
source.metadata_mut().clear();
TonicStatus {
source,
location: Location::default(),
}
})?;
let res = client.delete(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}

View File

@@ -33,7 +33,8 @@ DROP TABLE t;
Error: 4001(TableNotFound), Table not found: greptime.public.t
-- SQLNESS REPLACE details.*
DROP TABLE new_table;
Error: 1003(Internal), status: Internal, message: "Table route not found: __meta_table_route-greptime-public-new_table-1025", details: [], metadata: MetadataMap { headers: {} }
Error: 1003(Internal), status: Internal, message: "Table route not found: __meta_table_route-greptime-public-new_table-1025",

View File

@@ -12,4 +12,5 @@ ALTER TABLE t RENAME new_table;
DROP TABLE t;
-- TODO: this clause should success
-- SQLNESS REPLACE details.*
DROP TABLE new_table;

View File

@@ -21,7 +21,7 @@ Error: 3000(PlanQuery), Error during planning: For SELECT DISTINCT, ORDER BY exp
SELECT DISTINCT ON (1) i % 2, i FROM integers WHERE i<3 ORDER BY i;
Error: 1001(Unsupported), SQL statement is not supported: SELECT DISTINCT ON (1) i % 2, i FROM integers WHERE i<3 ORDER BY i;, keyword: %
Error: 1001(Unsupported), SQL statement is not supported: SELECT DISTINCT ON (1) i % 2, i FROM integers WHERE i<3 ORDER BY i;, keyword: %
SELECT DISTINCT integers.i FROM integers ORDER BY i DESC;

View File

@@ -101,7 +101,7 @@ SHOW TABLES FROM public;
DROP SCHEMA test_public_schema;
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_public_schema;, keyword: SCHEMA
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_public_schema;, keyword: SCHEMA
SELECT * FROM test_public_schema.hello;

View File

@@ -13,6 +13,6 @@ common-grpc = { path = "../../src/common/grpc" }
common-query = { path = "../../src/common/query" }
common-time = { path = "../../src/common/time" }
serde.workspace = true
sqlness = "0.4"
sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "dde4b19d7e4a41319d05a0c5bfae5c4422fde14f" }
tinytemplate = "1.2"
tokio.workspace = true

View File

@@ -16,6 +16,9 @@ use std::fmt::Display;
use std::fs::OpenOptions;
use std::path::{Path, PathBuf};
use std::process::Stdio;
// use tokio::process::{Child, Command};
use std::process::{Child, Command};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
@@ -28,8 +31,7 @@ use common_query::Output;
use serde::Serialize;
use sqlness::{Database, EnvController, QueryContext};
use tinytemplate::TinyTemplate;
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tokio::sync::Mutex as TokioMutex;
use crate::util;
@@ -58,15 +60,7 @@ impl EnvController for Env {
/// Stop one [`Database`].
async fn stop(&self, _mode: &str, mut database: Self::DB) {
let mut server = database.server_process.lock().await;
Env::stop_server(&mut server).await;
if let Some(mut metasrv) = database.metasrv_process.take() {
Env::stop_server(&mut metasrv).await;
}
if let Some(mut datanode) = database.frontend_process.take() {
Env::stop_server(&mut datanode).await;
}
println!("Stopped DB.");
database.stop();
}
}
@@ -85,10 +79,10 @@ impl Env {
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_process: Mutex::new(server_process),
server_process: Arc::new(Mutex::new(server_process)),
metasrv_process: None,
frontend_process: None,
client: Mutex::new(db),
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
}
@@ -108,18 +102,18 @@ impl Env {
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_process: Mutex::new(datanode),
server_process: Arc::new(Mutex::new(datanode)),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: Mutex::new(db),
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: false,
}
}
async fn stop_server(process: &mut Child) {
process.kill().await.unwrap();
let _ = process.wait().await;
fn stop_server(process: &mut Child) {
let _ = process.kill();
let _ = process.wait();
}
async fn start_server(
@@ -179,7 +173,7 @@ impl Env {
_ => panic!("Unexpected subcommand: {subcommand}"),
};
if !util::check_port(ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut process).await;
Env::stop_server(&mut process);
panic!("{subcommand} doesn't up in 10 seconds, quit.")
}
@@ -188,8 +182,10 @@ impl Env {
/// stop and restart the server process
async fn restart_server(db: &GreptimeDB) {
let mut server_process = db.server_process.lock().await;
Env::stop_server(&mut server_process).await;
{
let mut server_process = db.server_process.lock().unwrap();
Env::stop_server(&mut server_process);
}
// check if the server is distributed or standalone
let subcommand = if db.is_standalone {
@@ -198,6 +194,8 @@ impl Env {
"datanode"
};
let new_server_process = Env::start_server(subcommand, &db.ctx, false).await;
let mut server_process = db.server_process.lock().unwrap();
*server_process = new_server_process;
}
@@ -240,7 +238,6 @@ impl Env {
.args(["build", "--bin", "greptime"])
.stdout(Stdio::null())
.output()
.await
.expect("Failed to start GreptimeDB")
.status;
if !cargo_build_result.success() {
@@ -251,10 +248,10 @@ impl Env {
}
pub struct GreptimeDB {
server_process: Mutex<Child>,
server_process: Arc<Mutex<Child>>,
metasrv_process: Option<Child>,
frontend_process: Option<Child>,
client: Mutex<DB>,
client: TokioMutex<DB>,
ctx: GreptimeDBContext,
is_standalone: bool,
}
@@ -281,6 +278,27 @@ impl Database for GreptimeDB {
}
}
impl GreptimeDB {
#![allow(clippy::print_stdout)]
fn stop(&mut self) {
let mut server = self.server_process.lock().unwrap();
Env::stop_server(&mut server);
if let Some(mut metasrv) = self.metasrv_process.take() {
Env::stop_server(&mut metasrv);
}
if let Some(mut datanode) = self.frontend_process.take() {
Env::stop_server(&mut datanode);
}
println!("Stopped DB.");
}
}
impl Drop for GreptimeDB {
fn drop(&mut self) {
self.stop();
}
}
struct GreptimeDBContext {
/// Start time in millisecond
time: i64,

View File

@@ -34,6 +34,6 @@ async fn main() {
.follow_links(true)
.build()
.unwrap();
let runner = Runner::new_with_config(config, Env {}).await.unwrap();
let runner = Runner::new(config, Env {});
runner.run().await.unwrap();
}