mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 20:10:37 +00:00
fix: support restart sqlness in distributed mode (#1443)
* fix: support restart sqlness in distributed mode Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * move alter_table case to common dir Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * is_standalone flag Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * Update tests/runner/src/env.rs Co-authored-by: LFC <bayinamine@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
@@ -1,46 +0,0 @@
|
||||
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
ALTER TABLE t ADD COLUMN k INTEGER;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
| k | Int32 | YES | | FIELD |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
ALTER TABLE t ADD COLUMN m INTEGER;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+-------+-------+------+---------+---------------+
|
||||
| Field | Type | Null | Default | Semantic Type |
|
||||
+-------+-------+------+---------+---------------+
|
||||
| i | Int32 | YES | | FIELD |
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
| k | Int32 | YES | | FIELD |
|
||||
| m | Int32 | YES | | FIELD |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
DROP TABLE t;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
ALTER TABLE t ADD COLUMN k INTEGER;
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
ALTER TABLE t ADD COLUMN m INTEGER;
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
DROP TABLE t;
|
||||
@@ -77,13 +77,8 @@ impl Env {
|
||||
|
||||
let db_ctx = GreptimeDBContext::new();
|
||||
|
||||
let mut server_process = Self::start_server("standalone", &db_ctx, true);
|
||||
let server_process = Self::start_server("standalone", &db_ctx, true).await;
|
||||
|
||||
let is_up = util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await;
|
||||
if !is_up {
|
||||
Env::stop_server(&mut server_process).await;
|
||||
panic!("Server doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
println!("Started, going to test. Log will be write to {STANDALONE_LOG_FILE}");
|
||||
|
||||
let client = Client::with_urls(vec![SERVER_ADDR]);
|
||||
@@ -95,6 +90,7 @@ impl Env {
|
||||
frontend_process: None,
|
||||
client: Mutex::new(db),
|
||||
ctx: db_ctx,
|
||||
is_standalone: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,25 +100,9 @@ impl Env {
|
||||
let db_ctx = GreptimeDBContext::new();
|
||||
|
||||
// start a distributed GreptimeDB
|
||||
let mut meta_server = Env::start_server("metasrv", &db_ctx, true);
|
||||
if !util::check_port(METASRV_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
|
||||
Env::stop_server(&mut meta_server).await;
|
||||
panic!("Metasrv doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
|
||||
let mut datanode = Env::start_server("datanode", &db_ctx, true);
|
||||
// Wait for default catalog and schema being created.
|
||||
// Can be removed once #1265 resolved, and merged with Frontend start checking below.
|
||||
if !util::check_port(DATANODE_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
|
||||
Env::stop_server(&mut datanode).await;
|
||||
panic!("Datanode doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
|
||||
let mut frontend = Env::start_server("frontend", &db_ctx, true);
|
||||
if !util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await {
|
||||
Env::stop_server(&mut frontend).await;
|
||||
panic!("Frontend doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
let meta_server = Env::start_server("metasrv", &db_ctx, true).await;
|
||||
let datanode = Env::start_server("datanode", &db_ctx, true).await;
|
||||
let frontend = Env::start_server("frontend", &db_ctx, true).await;
|
||||
|
||||
let client = Client::with_urls(vec![SERVER_ADDR]);
|
||||
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
@@ -133,6 +113,7 @@ impl Env {
|
||||
frontend_process: Some(frontend),
|
||||
client: Mutex::new(db),
|
||||
ctx: db_ctx,
|
||||
is_standalone: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,7 +122,11 @@ impl Env {
|
||||
let _ = process.wait().await;
|
||||
}
|
||||
|
||||
fn start_server(subcommand: &str, db_ctx: &GreptimeDBContext, truncate_log: bool) -> Child {
|
||||
async fn start_server(
|
||||
subcommand: &str,
|
||||
db_ctx: &GreptimeDBContext,
|
||||
truncate_log: bool,
|
||||
) -> Child {
|
||||
let log_file_name = match subcommand {
|
||||
"datanode" => DATANODE_LOG_FILE,
|
||||
"frontend" => FRONTEND_LOG_FILE,
|
||||
@@ -178,12 +163,26 @@ impl Env {
|
||||
_ => panic!("Unexpected subcommand: {subcommand}"),
|
||||
}
|
||||
|
||||
let process = Command::new("./greptime")
|
||||
let mut process = Command::new("./greptime")
|
||||
.current_dir(util::get_binary_dir("debug"))
|
||||
.args(args)
|
||||
.stdout(log_file)
|
||||
.spawn()
|
||||
.unwrap_or_else(|_| panic!("Failed to start the DB with subcommand {subcommand}"));
|
||||
|
||||
// check connection
|
||||
let ip_addr = match subcommand {
|
||||
"datanode" => DATANODE_ADDR,
|
||||
"frontend" => SERVER_ADDR,
|
||||
"metasrv" => METASRV_ADDR,
|
||||
"standalone" => SERVER_ADDR,
|
||||
_ => panic!("Unexpected subcommand: {subcommand}"),
|
||||
};
|
||||
if !util::check_port(ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
|
||||
Env::stop_server(&mut process).await;
|
||||
panic!("{subcommand} doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
|
||||
process
|
||||
}
|
||||
|
||||
@@ -191,14 +190,15 @@ impl Env {
|
||||
async fn restart_server(db: &GreptimeDB) {
|
||||
let mut server_process = db.server_process.lock().await;
|
||||
Env::stop_server(&mut server_process).await;
|
||||
let new_server_process = Env::start_server("standalone", &db.ctx, false);
|
||||
*server_process = new_server_process;
|
||||
|
||||
let is_up = util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(2)).await;
|
||||
if !is_up {
|
||||
Env::stop_server(&mut server_process).await;
|
||||
panic!("Server doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
// check if the server is distributed or standalone
|
||||
let subcommand = if db.is_standalone {
|
||||
"standalone"
|
||||
} else {
|
||||
"datanode"
|
||||
};
|
||||
let new_server_process = Env::start_server(subcommand, &db.ctx, false).await;
|
||||
*server_process = new_server_process;
|
||||
}
|
||||
|
||||
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
|
||||
@@ -254,6 +254,7 @@ pub struct GreptimeDB {
|
||||
frontend_process: Option<Child>,
|
||||
client: Mutex<DB>,
|
||||
ctx: GreptimeDBContext,
|
||||
is_standalone: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
Reference in New Issue
Block a user