mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: implement restart argument for sqlness-runner (#1262)
* refactor standalone mode and distribute mode start process Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * implement restart arg Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * Update tests/runner/src/env.rs Co-authored-by: LFC <bayinamine@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
@@ -50,6 +50,7 @@ DESC TABLE new_table;
|
||||
| j | Int64 | NO | | TIME INDEX |
|
||||
+-------+-------+------+---------+---------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT * FROM new_table;
|
||||
|
||||
+---+---+
|
||||
|
||||
@@ -16,6 +16,7 @@ CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
|
||||
|
||||
DESC TABLE new_table;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT * FROM new_table;
|
||||
|
||||
ALTER TABLE new_table RENAME new_table;
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::util;
|
||||
const DATANODE_ADDR: &str = "127.0.0.1:4100";
|
||||
const METASRV_ADDR: &str = "127.0.0.1:3002";
|
||||
const SERVER_ADDR: &str = "127.0.0.1:4001";
|
||||
const SERVER_LOG_FILE: &str = "/tmp/greptime-sqlness.log";
|
||||
const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log";
|
||||
const METASRV_LOG_FILE: &str = "/tmp/greptime-sqlness-metasrv.log";
|
||||
const FRONTEND_LOG_FILE: &str = "/tmp/greptime-sqlness-frontend.log";
|
||||
const DATANODE_LOG_FILE: &str = "/tmp/greptime-sqlness-datanode.log";
|
||||
@@ -58,12 +58,12 @@ impl EnvController for Env {
|
||||
|
||||
/// Stop one [`Database`].
|
||||
async fn stop(&self, _mode: &str, mut database: Self::DB) {
|
||||
let mut server = database.server_process;
|
||||
let mut server = database.server_process.lock().await;
|
||||
Env::stop_server(&mut server).await;
|
||||
if let Some(mut metasrv) = database.metasrv_process.take() {
|
||||
Env::stop_server(&mut metasrv).await;
|
||||
}
|
||||
if let Some(mut datanode) = database.datanode_process.take() {
|
||||
if let Some(mut datanode) = database.frontend_process.take() {
|
||||
Env::stop_server(&mut datanode).await;
|
||||
}
|
||||
println!("Stopped DB.");
|
||||
@@ -73,103 +73,42 @@ impl EnvController for Env {
|
||||
#[allow(clippy::print_stdout)]
|
||||
impl Env {
|
||||
pub async fn start_standalone() -> GreptimeDB {
|
||||
// Build the DB with `cargo build --bin greptime`
|
||||
println!("Going to build the DB...");
|
||||
let cargo_build_result = Command::new("cargo")
|
||||
.current_dir(util::get_workspace_root())
|
||||
.args(["build", "--bin", "greptime"])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.await
|
||||
.expect("Failed to start GreptimeDB")
|
||||
.status;
|
||||
if !cargo_build_result.success() {
|
||||
panic!("Failed to build GreptimeDB (`cargo build` fails)");
|
||||
}
|
||||
println!("Build finished, starting...");
|
||||
Self::build_db().await;
|
||||
|
||||
// Open log file (build logs will be truncated).
|
||||
let log_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(SERVER_LOG_FILE)
|
||||
.unwrap_or_else(|_| panic!("Cannot open log file at {SERVER_LOG_FILE}"));
|
||||
let db_ctx = GreptimeDBContext::new();
|
||||
|
||||
let conf = Self::generate_standalone_config_file();
|
||||
// Start the DB
|
||||
let server_process = Command::new("./greptime")
|
||||
.current_dir(util::get_binary_dir("debug"))
|
||||
.args(["--log-level=debug", "standalone", "start", "-c", &conf])
|
||||
.stdout(log_file)
|
||||
.spawn()
|
||||
.expect("Failed to start the DB");
|
||||
let mut server_process = Self::start_server("standalone", &db_ctx, true);
|
||||
|
||||
let is_up = util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(10)).await;
|
||||
if !is_up {
|
||||
Env::stop_server(&mut server_process).await;
|
||||
panic!("Server doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
println!("Started, going to test. Log will be write to {SERVER_LOG_FILE}");
|
||||
println!("Started, going to test. Log will be write to {STANDALONE_LOG_FILE}");
|
||||
|
||||
let client = Client::with_urls(vec![SERVER_ADDR]);
|
||||
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
|
||||
GreptimeDB {
|
||||
server_process,
|
||||
server_process: Mutex::new(server_process),
|
||||
metasrv_process: None,
|
||||
datanode_process: None,
|
||||
frontend_process: None,
|
||||
client: Mutex::new(db),
|
||||
ctx: db_ctx,
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_standalone_config_file() -> String {
|
||||
let mut tt = TinyTemplate::new();
|
||||
|
||||
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
path.push("../conf/standalone-test.toml.template");
|
||||
let template = std::fs::read_to_string(path).unwrap();
|
||||
tt.add_template("standalone", &template).unwrap();
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Context {
|
||||
wal_dir: String,
|
||||
data_dir: String,
|
||||
}
|
||||
|
||||
let current_time = common_time::util::current_time_millis();
|
||||
let greptimedb_dir = format!("/tmp/greptimedb-{current_time}");
|
||||
let ctx = Context {
|
||||
wal_dir: format!("{greptimedb_dir}/wal/"),
|
||||
data_dir: format!("{greptimedb_dir}/data/"),
|
||||
};
|
||||
let rendered = tt.render("standalone", &ctx).unwrap();
|
||||
|
||||
let conf_file = format!("/tmp/standalone-{current_time}.toml");
|
||||
println!("Generating standalone config file in {conf_file}, full content:\n{rendered}");
|
||||
std::fs::write(&conf_file, rendered).unwrap();
|
||||
|
||||
conf_file
|
||||
}
|
||||
|
||||
pub async fn start_distributed() -> GreptimeDB {
|
||||
let cargo_build_result = Command::new("cargo")
|
||||
.current_dir(util::get_workspace_root())
|
||||
.args(["build", "--bin", "greptime"])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.await
|
||||
.expect("Failed to start GreptimeDB")
|
||||
.status;
|
||||
if !cargo_build_result.success() {
|
||||
panic!("Failed to build GreptimeDB (`cargo build` fails)");
|
||||
}
|
||||
Self::build_db().await;
|
||||
|
||||
let db_ctx = GreptimeDBContext::new();
|
||||
|
||||
// start a distributed GreptimeDB
|
||||
let mut meta_server = Env::start_server("metasrv");
|
||||
let mut meta_server = Env::start_server("metasrv", &db_ctx, true);
|
||||
// wait for election
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let mut frontend = Env::start_server("frontend");
|
||||
let mut datanode = Env::start_server("datanode");
|
||||
let mut frontend = Env::start_server("frontend", &db_ctx, true);
|
||||
let mut datanode = Env::start_server("datanode", &db_ctx, true);
|
||||
|
||||
for addr in [DATANODE_ADDR, METASRV_ADDR, SERVER_ADDR].iter() {
|
||||
let is_up = util::check_port(addr.parse().unwrap(), Duration::from_secs(10)).await;
|
||||
@@ -185,10 +124,11 @@ impl Env {
|
||||
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
|
||||
GreptimeDB {
|
||||
server_process: frontend,
|
||||
server_process: Mutex::new(datanode),
|
||||
metasrv_process: Some(meta_server),
|
||||
datanode_process: Some(datanode),
|
||||
frontend_process: Some(frontend),
|
||||
client: Mutex::new(db),
|
||||
ctx: db_ctx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,46 +137,68 @@ impl Env {
|
||||
let _ = process.wait().await;
|
||||
}
|
||||
|
||||
fn start_server(subcommand: &str) -> Child {
|
||||
fn start_server(subcommand: &str, db_ctx: &GreptimeDBContext, truncate_log: bool) -> Child {
|
||||
let log_file_name = match subcommand {
|
||||
"datanode" => DATANODE_LOG_FILE,
|
||||
"frontend" => FRONTEND_LOG_FILE,
|
||||
"metasrv" => METASRV_LOG_FILE,
|
||||
"standalone" => STANDALONE_LOG_FILE,
|
||||
_ => panic!("Unexpected subcommand: {subcommand}"),
|
||||
};
|
||||
let log_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.truncate(truncate_log)
|
||||
.open(log_file_name)
|
||||
.unwrap_or_else(|_| panic!("Cannot open log file at {log_file_name}"));
|
||||
|
||||
let mut args = vec![subcommand.to_string(), "start".to_string()];
|
||||
if subcommand == "frontend" {
|
||||
args.push("--metasrv-addr=0.0.0.0:3002".to_string())
|
||||
} else if subcommand == "datanode" {
|
||||
args.push("-c".to_string());
|
||||
args.push(Self::generate_datanode_config_file());
|
||||
} else if subcommand == "metasrv" {
|
||||
args.push("--use-memory-store".to_string());
|
||||
};
|
||||
let mut args = vec![
|
||||
"--log-level=debug".to_string(),
|
||||
subcommand.to_string(),
|
||||
"start".to_string(),
|
||||
];
|
||||
match subcommand {
|
||||
"datanode" | "standalone" => {
|
||||
args.push("-c".to_string());
|
||||
args.push(Self::generate_config_file(subcommand, db_ctx));
|
||||
}
|
||||
"frontend" => args.push("--metasrv-addr=0.0.0.0:3002".to_string()),
|
||||
"metasrv" => args.push("--use-memory-store".to_string()),
|
||||
|
||||
_ => panic!("Unexpected subcommand: {subcommand}"),
|
||||
}
|
||||
|
||||
let process = Command::new("./greptime")
|
||||
.current_dir(util::get_binary_dir("debug"))
|
||||
.args(args)
|
||||
.stdout(log_file)
|
||||
.spawn()
|
||||
.expect("Failed to start the DB");
|
||||
.unwrap_or_else(|_| panic!("Failed to start the DB with subcommand {subcommand}"));
|
||||
process
|
||||
}
|
||||
|
||||
fn generate_datanode_config_file() -> String {
|
||||
/// stop and restart the server process
|
||||
async fn restart_server(db: &GreptimeDB) {
|
||||
let mut server_process = db.server_process.lock().await;
|
||||
Env::stop_server(&mut server_process).await;
|
||||
let new_server_process = Env::start_server("standalone", &db.ctx, false);
|
||||
*server_process = new_server_process;
|
||||
|
||||
let is_up = util::check_port(SERVER_ADDR.parse().unwrap(), Duration::from_secs(2)).await;
|
||||
if !is_up {
|
||||
Env::stop_server(&mut server_process).await;
|
||||
panic!("Server doesn't up in 10 seconds, quit.")
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
|
||||
fn generate_config_file(subcommand: &str, db_ctx: &GreptimeDBContext) -> String {
|
||||
let mut tt = TinyTemplate::new();
|
||||
|
||||
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
path.push("../conf/datanode-test.toml.template");
|
||||
path.push(format!("../conf/{subcommand}-test.toml.template"));
|
||||
let template = std::fs::read_to_string(path).unwrap();
|
||||
tt.add_template("datanode", &template).unwrap();
|
||||
tt.add_template(subcommand, &template).unwrap();
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Context {
|
||||
@@ -244,32 +206,53 @@ impl Env {
|
||||
data_dir: String,
|
||||
}
|
||||
|
||||
let current_time = common_time::util::current_time_millis();
|
||||
let greptimedb_dir = format!("/tmp/greptimedb-datanode-{current_time}/");
|
||||
let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time);
|
||||
let ctx = Context {
|
||||
wal_dir: format!("{greptimedb_dir}/wal/"),
|
||||
data_dir: format!("{greptimedb_dir}/data/"),
|
||||
};
|
||||
let rendered = tt.render("datanode", &ctx).unwrap();
|
||||
let rendered = tt.render(subcommand, &ctx).unwrap();
|
||||
|
||||
let conf_file = format!("/tmp/datanode-{current_time}.toml");
|
||||
println!("Generating datanode config file in {conf_file}, full content:\n{rendered}");
|
||||
let conf_file = format!("/tmp/{subcommand}-{}.toml", db_ctx.time);
|
||||
println!("Generating {subcommand} config file in {conf_file}, full content:\n{rendered}");
|
||||
std::fs::write(&conf_file, rendered).unwrap();
|
||||
|
||||
conf_file
|
||||
}
|
||||
|
||||
/// Build the DB with `cargo build --bin greptime`
|
||||
async fn build_db() {
|
||||
println!("Going to build the DB...");
|
||||
let cargo_build_result = Command::new("cargo")
|
||||
.current_dir(util::get_workspace_root())
|
||||
.args(["build", "--bin", "greptime"])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.await
|
||||
.expect("Failed to start GreptimeDB")
|
||||
.status;
|
||||
if !cargo_build_result.success() {
|
||||
panic!("Failed to build GreptimeDB (`cargo build` fails)");
|
||||
}
|
||||
println!("Build finished, starting...");
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GreptimeDB {
|
||||
server_process: Child,
|
||||
server_process: Mutex<Child>,
|
||||
metasrv_process: Option<Child>,
|
||||
datanode_process: Option<Child>,
|
||||
frontend_process: Option<Child>,
|
||||
client: Mutex<DB>,
|
||||
ctx: GreptimeDBContext,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for GreptimeDB {
|
||||
async fn query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
|
||||
async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
|
||||
if ctx.context.contains_key("restart") {
|
||||
Env::restart_server(self).await;
|
||||
}
|
||||
|
||||
let mut client = self.client.lock().await;
|
||||
if query.trim().starts_with("USE ") {
|
||||
let database = query
|
||||
@@ -285,6 +268,19 @@ impl Database for GreptimeDB {
|
||||
}
|
||||
}
|
||||
|
||||
struct GreptimeDBContext {
|
||||
/// Start time in millisecond
|
||||
time: i64,
|
||||
}
|
||||
|
||||
impl GreptimeDBContext {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
time: common_time::util::current_time_millis(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ResultDisplayer {
|
||||
result: Result<Output, ClientError>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user