feat: add '--server-addr' in sqlness runner (#2692)

* feat: add '--server-addr' and '--mode' in sqlness runner

* chore: remove '--mode'

* refactor: add 'connect_db()'
This commit is contained in:
zyy17
2023-11-06 18:56:04 +08:00
committed by GitHub
parent 0a91335e24
commit cf94d3295f
2 changed files with 86 additions and 42 deletions

View File

@@ -41,8 +41,10 @@ const METASRV_ADDR: &str = "127.0.0.1:3002";
const SERVER_ADDR: &str = "127.0.0.1:4001";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
#[derive(Clone)]
pub struct Env {
data_home: PathBuf,
server_addr: Option<String>,
}
#[allow(clippy::print_stdout)]
@@ -66,56 +68,86 @@ impl EnvController for Env {
#[allow(clippy::print_stdout)]
impl Env {
pub fn new(data_home: PathBuf) -> Self {
Self { data_home }
pub fn new(data_home: PathBuf, server_addr: Option<String>) -> Self {
Self {
data_home,
server_addr,
}
}
async fn start_standalone(&self) -> GreptimeDB {
Self::build_db().await;
if let Some(server_addr) = self.server_addr.clone() {
self.connect_db(&server_addr)
} else {
Self::build_db().await;
let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new();
let server_process = self.start_server("standalone", &db_ctx, true).await;
let server_process = self.start_server("standalone", &db_ctx, true).await;
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_processes: Arc::new(Mutex::new(vec![server_process])),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: Env::new(self.data_home.clone()),
GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![server_process]))),
metasrv_process: None,
frontend_process: None,
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: true,
env: self.clone(),
}
}
}
async fn start_distributed(&self) -> GreptimeDB {
Self::build_db().await;
if let Some(server_addr) = self.server_addr.clone() {
self.connect_db(&server_addr)
} else {
Self::build_db().await;
let db_ctx = GreptimeDBContext::new();
let db_ctx = GreptimeDBContext::new();
// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;
let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;
let frontend = self.start_server("frontend", &db_ctx, true).await;
let frontend = self.start_server("frontend", &db_ctx, true).await;
let client = Client::with_urls(vec![SERVER_ADDR]);
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_processes: Some(Arc::new(Mutex::new(vec![
datanode_1, datanode_2, datanode_3,
]))),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
ctx: db_ctx,
is_standalone: false,
env: self.clone(),
}
}
}
fn connect_db(&self, server_addr: &str) -> GreptimeDB {
let client = Client::with_urls(vec![server_addr.to_owned()]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
GreptimeDB {
server_processes: Arc::new(Mutex::new(vec![datanode_1, datanode_2, datanode_3])),
metasrv_process: Some(meta_server),
frontend_process: Some(frontend),
client: TokioMutex::new(db),
ctx: db_ctx,
server_processes: None,
metasrv_process: None,
frontend_process: None,
ctx: GreptimeDBContext {
time: 0,
datanode_id: Default::default(),
},
is_standalone: false,
env: Env::new(self.data_home.clone()),
env: self.clone(),
}
}
@@ -244,9 +276,11 @@ impl Env {
/// stop and restart the server process
async fn restart_server(&self, db: &GreptimeDB) {
{
let mut server_processes = db.server_processes.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
if let Some(server_process) = db.server_processes.clone() {
let mut server_processes = server_process.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
}
}
}
@@ -265,8 +299,10 @@ impl Env {
processes
};
let mut server_processes = db.server_processes.lock().unwrap();
*server_processes = new_server_processes;
if let Some(server_process) = db.server_processes.clone() {
let mut server_processes = server_process.lock().unwrap();
*server_processes = new_server_processes;
}
}
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
@@ -332,7 +368,7 @@ impl Env {
}
pub struct GreptimeDB {
server_processes: Arc<Mutex<Vec<Child>>>,
server_processes: Option<Arc<Mutex<Vec<Child>>>>,
metasrv_process: Option<Child>,
frontend_process: Option<Child>,
client: TokioMutex<DB>,
@@ -344,7 +380,7 @@ pub struct GreptimeDB {
#[async_trait]
impl Database for GreptimeDB {
async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
if ctx.context.contains_key("restart") {
if ctx.context.contains_key("restart") && self.env.server_addr.is_none() {
self.env.restart_server(self).await;
}
@@ -383,9 +419,11 @@ impl Database for GreptimeDB {
impl GreptimeDB {
#![allow(clippy::print_stdout)]
fn stop(&mut self) {
let mut servers = self.server_processes.lock().unwrap();
for server in servers.iter_mut() {
Env::stop_server(server);
if let Some(server_processes) = self.server_processes.clone() {
let mut server_processes = server_processes.lock().unwrap();
for server_process in server_processes.iter_mut() {
Env::stop_server(server_process);
}
}
if let Some(mut metasrv) = self.metasrv_process.take() {
Env::stop_server(&mut metasrv);
@@ -399,7 +437,9 @@ impl GreptimeDB {
impl Drop for GreptimeDB {
fn drop(&mut self) {
self.stop();
if self.env.server_addr.is_none() {
self.stop();
}
}
}

View File

@@ -40,6 +40,10 @@ struct Args {
/// Name of test cases to run. Accept as a regexp.
#[clap(short, long, default_value = ".*")]
test_filter: String,
/// Address of the server
#[clap(short, long)]
server_addr: Option<String>,
}
#[tokio::main]
@@ -59,6 +63,6 @@ async fn main() {
.env_config_file(args.env_config_file)
.build()
.unwrap();
let runner = Runner::new(config, Env::new(data_home));
let runner = Runner::new(config, Env::new(data_home, args.server_addr));
runner.run().await.unwrap();
}