diff --git a/Cargo.lock b/Cargo.lock index 20cebde2d6..83832853b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10895,12 +10895,12 @@ dependencies = [ [[package]] name = "sqlness" version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308a7338f2211813d6e9da117e9b9b7aee5d072872d11a934002fd2bd4ab5276" +source = "git+https://github.com/CeresDB/sqlness.git?rev=bb91f31ff58993e07ea89845791235138283a24c#bb91f31ff58993e07ea89845791235138283a24c" dependencies = [ "async-trait", "derive_builder 0.11.2", "duration-str", + "futures", "minijinja", "prettydiff", "regex", @@ -10926,6 +10926,7 @@ dependencies = [ "hex", "local-ip-address", "mysql", + "num_cpus", "reqwest", "serde", "serde_json", diff --git a/tests/cases/distributed/explain/analyze_append_table_count.result b/tests/cases/distributed/explain/analyze_append_table_count.result index 6dba2ad0cd..8ec58af215 100644 --- a/tests/cases/distributed/explain/analyze_append_table_count.result +++ b/tests/cases/distributed/explain/analyze_append_table_count.result @@ -49,3 +49,7 @@ EXPLAIN ANALYZE SELECT count(*) FROM test_table; |_|_| Total rows: 1_| +-+-+-+ +DROP TABLE test_table; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/analyze_append_table_count.sql b/tests/cases/distributed/explain/analyze_append_table_count.sql index fcb2e33c61..2fe7adc9bf 100644 --- a/tests/cases/distributed/explain/analyze_append_table_count.sql +++ b/tests/cases/distributed/explain/analyze_append_table_count.sql @@ -28,3 +28,5 @@ SELECT count(*) FROM test_table; -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED EXPLAIN ANALYZE SELECT count(*) FROM test_table; + +DROP TABLE test_table; diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result index adc75e65fe..e315f6ae99 100644 --- a/tests/cases/distributed/information_schema/cluster_info.result +++ b/tests/cases/distributed/information_schema/cluster_info.result @@ -22,50 +22,55 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:29411|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:29412|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:29413|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|127.0.0.1:29680|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:29401|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:29302|Version|Hash|Start_time|Duration||+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||1|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||2|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|Address|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|Address|Version|Hash|Start_time|Duration|Duration||1|METASRV|Address|Version|Hash|Start_time|Duration||+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:29302|Version|Hash|Start_time|Duration||+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|Address|Version|Hash|Start_time|Duration||+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|127.0.0.1:29401|Version|Hash|Start_time|Duration|Duration|+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|Address|Version|Hash|Start_time|Duration|Duration|+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:29411|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:29412|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:29413|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|127.0.0.1:29680|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:29302|Version|Hash|Start_time|Duration||+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||1|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||2|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|Address|Version|Hash|Start_time|Duration|Duration||1|METASRV|Address|Version|Hash|Start_time|Duration||+++++++++ -- SQLNESS REPLACE version node_version -- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type; -+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|2|DATANODE|127.0.0.1:29412|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:29413|Version|Hash|Start_time|Duration|Duration|+++++++++ ++++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|2|DATANODE|Address|Version|Hash|Start_time|Duration|Duration|+++++++++ USE PUBLIC; diff --git a/tests/cases/distributed/information_schema/cluster_info.sql b/tests/cases/distributed/information_schema/cluster_info.sql index c9b254aa16..6338409dd6 100644 --- a/tests/cases/distributed/information_schema/cluster_info.sql +++ b/tests/cases/distributed/information_schema/cluster_info.sql @@ -7,6 +7,7 @@ DESC TABLE CLUSTER_INFO; -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; @@ -15,6 +16,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type; -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; @@ -23,6 +25,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type; -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; @@ -31,6 +34,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; @@ -39,6 +43,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type; -- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash -- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time -- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration +-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address -- SQLNESS REPLACE [\s\-]+ SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type; diff --git a/tests/cases/standalone/common/ttl/show_ttl.result b/tests/cases/standalone/common/ttl/show_ttl.result index d98c1b612b..febf7191d7 100644 --- a/tests/cases/standalone/common/ttl/show_ttl.result +++ b/tests/cases/standalone/common/ttl/show_ttl.result @@ -372,3 +372,7 @@ ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant'; Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: instant +DROP DATABASE test_ttl_db_2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/ttl/show_ttl.sql b/tests/cases/standalone/common/ttl/show_ttl.sql index d226b96211..b9c67b440a 100644 --- a/tests/cases/standalone/common/ttl/show_ttl.sql +++ b/tests/cases/standalone/common/ttl/show_ttl.sql @@ -80,3 +80,5 @@ CREATE DATABASE test_ttl_db WITH (ttl = 'instant'); CREATE DATABASE test_ttl_db_2 WITH (ttl = '1s'); ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant'; + +DROP DATABASE test_ttl_db_2; diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index d820babf5c..20987eed9a 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -22,7 +22,7 @@ type = 'File' data_home = '{data_home}' [meta_client_options] -metasrv_addrs = ['127.0.0.1:29302'] +metasrv_addrs = ['{metasrv_addr}'] timeout_millis = 3000 connect_timeout_millis = 5000 tcp_nodelay = false diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template index 262e106bbd..cbeb983652 100644 --- a/tests/conf/standalone-test.toml.template +++ b/tests/conf/standalone-test.toml.template @@ -21,12 +21,12 @@ type = 'File' data_home = '{data_home}' [grpc] -bind_addr = '127.0.0.1:29401' +bind_addr = '{grpc_addr}' runtime_size = 8 [mysql] enable = true -addr = "127.0.0.1:29402" +addr = "{mysql_addr}" runtime_size = 2 [mysql.tls] @@ -34,7 +34,7 @@ mode = "disable" [postgres] enable = true -addr = "127.0.0.1:29403" +addr = "{postgres_addr}" runtime_size = 2 [procedure] diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 2aa54cd1dd..dfa28962f8 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -20,11 +20,13 @@ flate2 = "1.0" hex = "0.4" local-ip-address = "0.6" mysql = { version = "25.0.1", default-features = false, features = ["minimal", "rustls-tls"] } +num_cpus = "1.16" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } serde.workspace = true serde_json.workspace = true sha2 = "0.10" -sqlness = "0.6.1" # sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1 +# wait for https://github.com/CeresDB/sqlness/pull/71 +sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "bb91f31ff58993e07ea89845791235138283a24c" } tar = "0.4" tempfile.workspace = true tinytemplate = "1.2" diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index d92bcb34c0..00d9816984 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -39,21 +39,22 @@ use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use mysql::prelude::Queryable; use mysql::{Conn as MySqlClient, Row as MySqlRow}; -use serde::Serialize; use sqlness::{Database, EnvController, QueryContext}; -use tinytemplate::TinyTemplate; use tokio::sync::Mutex as TokioMutex; use tokio_postgres::{Client as PgClient, SimpleQueryMessage as PgRow}; use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY}; +use crate::server_mode::ServerMode; use crate::util::{get_workspace_root, maybe_pull_binary, PROGRAM}; use crate::{util, ServerAddr}; -const METASRV_ADDR: &str = "127.0.0.1:29302"; -const GRPC_SERVER_ADDR: &str = "127.0.0.1:29401"; -const MYSQL_SERVER_ADDR: &str = "127.0.0.1:29402"; -const POSTGRES_SERVER_ADDR: &str = "127.0.0.1:29403"; -const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; +// standalone mode +const SERVER_MODE_STANDALONE_IDX: usize = 0; +// distributed mode +const SERVER_MODE_METASRV_IDX: usize = 0; +const SERVER_MODE_DATANODE_START_IDX: usize = 1; +const SERVER_MODE_FRONTEND_IDX: usize = 4; +const SERVER_MODE_FLOWNODE_IDX: usize = 5; #[derive(Clone)] pub enum WalConfig { @@ -95,11 +96,15 @@ pub struct Env { impl EnvController for Env { type DB = GreptimeDB; - async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB { + async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB { + if self.server_addrs.server_addr.is_some() && id > 0 { + panic!("Parallel test mode is not supported when server address is already set."); + } + std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string()); match mode { - "standalone" => self.start_standalone().await, - "distributed" => self.start_distributed().await, + "standalone" => self.start_standalone(id).await, + "distributed" => self.start_distributed(id).await, _ => panic!("Unexpected mode: {mode}"), } } @@ -133,18 +138,23 @@ impl Env { } } - async fn start_standalone(&self) -> GreptimeDB { + async fn start_standalone(&self, id: usize) -> GreptimeDB { + println!("Starting standalone instance {id}"); + if self.server_addrs.server_addr.is_some() { - self.connect_db(&self.server_addrs).await + self.connect_db(&self.server_addrs, id).await } else { self.build_db(); self.setup_wal(); - let db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); + let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); - let server_process = self.start_server("standalone", &db_ctx, true).await; + let server_mode = ServerMode::random_standalone(); + db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX); + let server_addr = server_mode.server_addr().unwrap(); + let server_process = self.start_server(server_mode, &db_ctx, id, true).await; - let mut greptimedb = self.connect_db(&Default::default()).await; + let mut greptimedb = self.connect_db(&server_addr, id).await; greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process]))); greptimedb.is_standalone = true; greptimedb.ctx = db_ctx; @@ -153,29 +163,51 @@ impl Env { } } - async fn start_distributed(&self) -> GreptimeDB { + async fn start_distributed(&self, id: usize) -> GreptimeDB { if self.server_addrs.server_addr.is_some() { - self.connect_db(&self.server_addrs).await + self.connect_db(&self.server_addrs, id).await } else { self.build_db(); self.setup_wal(); self.setup_etcd(); self.setup_pg(); - let db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); + let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); // start a distributed GreptimeDB - let meta_server = self.start_server("metasrv", &db_ctx, true).await; + let meta_server_mode = ServerMode::random_metasrv(); + let metasrv_port = match &meta_server_mode { + ServerMode::Metasrv { rpc_server_addr, .. } => rpc_server_addr + .split(':') + .nth(1) + .unwrap() + .parse::() + .unwrap(), + _ => panic!("metasrv mode not set, maybe running in remote mode which doesn't support restart?"), + }; + db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX); + let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await; - let datanode_1 = self.start_server("datanode", &db_ctx, true).await; - let datanode_2 = self.start_server("datanode", &db_ctx, true).await; - let datanode_3 = self.start_server("datanode", &db_ctx, true).await; + let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0); + db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX); + let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await; + let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1); + db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1); + let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await; + let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2); + db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2); + let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await; - let frontend = self.start_server("frontend", &db_ctx, true).await; + let frontend_mode = ServerMode::random_frontend(metasrv_port); + let server_addr = frontend_mode.server_addr().unwrap(); + db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX); + let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await; - let flownode = self.start_server("flownode", &db_ctx, true).await; + let flownode_mode = ServerMode::random_flownode(metasrv_port, 0); + db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX); + let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await; - let mut greptimedb = self.connect_db(&Default::default()).await; + let mut greptimedb = self.connect_db(&server_addr, id).await; greptimedb.metasrv_process = Some(meta_server).into(); greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![ @@ -237,24 +269,14 @@ impl Env { panic!("Failed to connect to MySQL server. Please check if the server is running.") } - async fn connect_db(&self, server_addr: &ServerAddr) -> GreptimeDB { - let grpc_server_addr = server_addr - .server_addr - .clone() - .unwrap_or(GRPC_SERVER_ADDR.to_owned()); - let pg_server_addr = server_addr - .pg_server_addr - .clone() - .unwrap_or(POSTGRES_SERVER_ADDR.to_owned()); - let mysql_server_addr = server_addr - .mysql_server_addr - .clone() - .unwrap_or(MYSQL_SERVER_ADDR.to_owned()); + async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB { + let grpc_server_addr = server_addr.server_addr.clone().unwrap(); + let pg_server_addr = server_addr.pg_server_addr.clone().unwrap(); + let mysql_server_addr = server_addr.mysql_server_addr.clone().unwrap(); let grpc_client = Client::with_urls(vec![grpc_server_addr.clone()]); let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); let pg_client = self.create_pg_client(&pg_server_addr).await; - let mysql_client = self.create_mysql_client(&mysql_server_addr).await; GreptimeDB { @@ -270,9 +292,11 @@ impl Env { datanode_id: Default::default(), wal: self.wal.clone(), store_config: self.store_config.clone(), + server_modes: Vec::new(), }, is_standalone: false, env: self.clone(), + id, } } @@ -283,25 +307,24 @@ impl Env { async fn start_server( &self, - subcommand: &str, + mode: ServerMode, db_ctx: &GreptimeDBContext, + id: usize, truncate_log: bool, ) -> Child { - let log_file_name = match subcommand { - "datanode" => { + let log_file_name = match mode { + ServerMode::Datanode { node_id, .. } => { db_ctx.incr_datanode_id(); - format!("greptime-sqlness-datanode-{}.log", db_ctx.datanode_id()) + format!("greptime-{}-sqlness-datanode-{}.log", id, node_id) } - // The flownode id is always 0 for now - "flownode" => "greptime-sqlness-flownode.log".to_string(), - "frontend" => "greptime-sqlness-frontend.log".to_string(), - "metasrv" => "greptime-sqlness-metasrv.log".to_string(), - "standalone" => "greptime-sqlness-standalone.log".to_string(), - _ => panic!("Unexpected subcommand: {subcommand}"), + ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id), + ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id), + ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id), + ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id), }; let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string(); - println!("{subcommand} log file at {stdout_file_name}"); + println!("DB instance {id} log file at {stdout_file_name}"); let stdout_file = OpenOptions::new() .create(true) @@ -311,103 +334,8 @@ impl Env { .open(stdout_file_name) .unwrap(); - let (args, check_ip_addrs) = match subcommand { - "datanode" => { - let (args, addr) = self.datanode_start_args(db_ctx); - (args, vec![addr]) - } - "flownode" => { - let (args, addr) = self.flownode_start_args(db_ctx, &self.sqlness_home); - (args, vec![addr]) - } - "standalone" => { - let args = vec![ - DEFAULT_LOG_LEVEL.to_string(), - subcommand.to_string(), - "start".to_string(), - format!( - "--log-dir={}/greptimedb-flownode/logs", - self.sqlness_home.display() - ), - "-c".to_string(), - self.generate_config_file(subcommand, db_ctx), - "--http-addr=127.0.0.1:29502".to_string(), - ]; - ( - args, - vec![ - GRPC_SERVER_ADDR.to_string(), - MYSQL_SERVER_ADDR.to_string(), - POSTGRES_SERVER_ADDR.to_string(), - ], - ) - } - "frontend" => { - let args = vec![ - DEFAULT_LOG_LEVEL.to_string(), - subcommand.to_string(), - "start".to_string(), - "--metasrv-addrs=127.0.0.1:29302".to_string(), - "--http-addr=127.0.0.1:29503".to_string(), - format!("--rpc-bind-addr={}", GRPC_SERVER_ADDR), - format!("--mysql-addr={}", MYSQL_SERVER_ADDR), - format!("--postgres-addr={}", POSTGRES_SERVER_ADDR), - format!( - "--log-dir={}/greptimedb-frontend/logs", - self.sqlness_home.display() - ), - "-c".to_string(), - self.generate_config_file(subcommand, db_ctx), - ]; - ( - args, - vec![ - GRPC_SERVER_ADDR.to_string(), - MYSQL_SERVER_ADDR.to_string(), - POSTGRES_SERVER_ADDR.to_string(), - ], - ) - } - "metasrv" => { - let mut args = vec![ - DEFAULT_LOG_LEVEL.to_string(), - subcommand.to_string(), - "start".to_string(), - "--rpc-bind-addr".to_string(), - "127.0.0.1:29302".to_string(), - "--rpc-server-addr".to_string(), - "127.0.0.1:29302".to_string(), - "--enable-region-failover".to_string(), - "false".to_string(), - "--http-addr=127.0.0.1:29502".to_string(), - format!( - "--log-dir={}/greptimedb-metasrv/logs", - self.sqlness_home.display() - ), - "-c".to_string(), - self.generate_config_file(subcommand, db_ctx), - ]; - if db_ctx.store_config().setup_pg { - let client_ports = self - .store_config - .store_addrs - .iter() - .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) - .collect::>(); - let client_port = client_ports.first().unwrap_or(&5432); - let pg_server_addr = format!( - "postgresql://greptimedb:admin@127.0.0.1:{}/postgres", - client_port - ); - args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]); - args.extend(vec!["--store-addrs".to_string(), pg_server_addr]); - } else if db_ctx.store_config().store_addrs.is_empty() { - args.extend(vec!["--backend".to_string(), "memory-store".to_string()]) - } - (args, vec![METASRV_ADDR.to_string()]) - } - _ => panic!("Unexpected subcommand: {subcommand}"), - }; + let args = mode.get_args(&self.sqlness_home, self, db_ctx, id); + let check_ip_addrs = mode.check_addrs(); for check_ip_addr in &check_ip_addrs { if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await { @@ -431,69 +359,23 @@ impl Env { .stdout(stdout_file) .spawn() .unwrap_or_else(|error| { - panic!("Failed to start the DB with subcommand {subcommand},Error: {error}, path: {:?}", bins_dir.join(program)); + panic!( + "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}", + mode.name(), + bins_dir.join(program) + ); }); for check_ip_addr in &check_ip_addrs { if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await { Env::stop_server(&mut process); - panic!("{subcommand} doesn't up in 10 seconds, quit.") + panic!("{} doesn't up in 10 seconds, quit.", mode.name()) } } process } - fn datanode_start_args(&self, db_ctx: &GreptimeDBContext) -> (Vec, String) { - let id = db_ctx.datanode_id(); - - let data_home = self - .sqlness_home - .join(format!("greptimedb_datanode_{}_{id}", db_ctx.time)); - - let subcommand = "datanode"; - let mut args = vec![ - DEFAULT_LOG_LEVEL.to_string(), - subcommand.to_string(), - "start".to_string(), - ]; - args.push(format!("--rpc-bind-addr=127.0.0.1:2941{id}")); - args.push(format!("--rpc-server-addr=127.0.0.1:2941{id}")); - args.push(format!("--http-addr=127.0.0.1:2943{id}")); - args.push(format!("--data-home={}", data_home.display())); - args.push(format!("--log-dir={}/logs", data_home.display())); - args.push(format!("--node-id={id}")); - args.push("-c".to_string()); - args.push(self.generate_config_file(subcommand, db_ctx)); - args.push("--metasrv-addrs=127.0.0.1:29302".to_string()); - (args, format!("127.0.0.1:2941{id}")) - } - - fn flownode_start_args( - &self, - _db_ctx: &GreptimeDBContext, - sqlness_home: &Path, - ) -> (Vec, String) { - let id = 0; - - let subcommand = "flownode"; - let mut args = vec![ - DEFAULT_LOG_LEVEL.to_string(), - subcommand.to_string(), - "start".to_string(), - ]; - args.push(format!("--rpc-bind-addr=127.0.0.1:2968{id}")); - args.push(format!("--rpc-server-addr=127.0.0.1:2968{id}")); - args.push(format!("--node-id={id}")); - args.push(format!( - "--log-dir={}/greptimedb-flownode/logs", - sqlness_home.display() - )); - args.push("--metasrv-addrs=127.0.0.1:29302".to_string()); - args.push(format!("--http-addr=127.0.0.1:2951{id}")); - (args, format!("127.0.0.1:2968{id}")) - } - /// stop and restart the server process async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) { { @@ -526,12 +408,30 @@ impl Env { // check if the server is distributed or standalone let new_server_processes = if db.is_standalone { - let new_server_process = self.start_server("standalone", &db.ctx, false).await; + let server_mode = db + .ctx + .get_server_mode(SERVER_MODE_STANDALONE_IDX) + .cloned() + .unwrap(); + let server_addr = server_mode.server_addr().unwrap(); + let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await; + + *db.pg_client.lock().await = self + .create_pg_client(&server_addr.pg_server_addr.unwrap()) + .await; + *db.mysql_client.lock().await = self + .create_mysql_client(&server_addr.mysql_server_addr.unwrap()) + .await; vec![new_server_process] } else { db.ctx.reset_datanode_id(); if is_full_restart { - let metasrv = self.start_server("metasrv", &db.ctx, false).await; + let metasrv_mode = db + .ctx + .get_server_mode(SERVER_MODE_METASRV_IDX) + .cloned() + .unwrap(); + let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await; db.metasrv_process .lock() .expect("lock poisoned") @@ -543,19 +443,41 @@ impl Env { } let mut processes = vec![]; - for _ in 0..3 { - let new_server_process = self.start_server("datanode", &db.ctx, false).await; + for i in 0..3 { + let datanode_mode = db + .ctx + .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i) + .cloned() + .unwrap(); + let new_server_process = self + .start_server(datanode_mode, &db.ctx, db.id, false) + .await; processes.push(new_server_process); } if is_full_restart { - let frontend = self.start_server("frontend", &db.ctx, false).await; + let frontend_mode = db + .ctx + .get_server_mode(SERVER_MODE_FRONTEND_IDX) + .cloned() + .unwrap(); + let frontend = self + .start_server(frontend_mode, &db.ctx, db.id, false) + .await; db.frontend_process .lock() .expect("lock poisoned") .replace(frontend); } - let flownode = self.start_server("flownode", &db.ctx, false).await; + + let flownode_mode = db + .ctx + .get_server_mode(SERVER_MODE_FLOWNODE_IDX) + .cloned() + .unwrap(); + let flownode = self + .start_server(flownode_mode, &db.ctx, db.id, false) + .await; db.flownode_process .lock() .expect("lock poisoned") @@ -564,11 +486,8 @@ impl Env { processes }; - if let Some(server_process) = db.server_processes.clone() { - *db.pg_client.lock().await = self.create_pg_client(&self.pg_server_addr()).await; - *db.mysql_client.lock().await = - self.create_mysql_client(&self.mysql_server_addr()).await; - let mut server_processes = server_process.lock().unwrap(); + if let Some(server_processes) = db.server_processes.clone() { + let mut server_processes = server_processes.lock().unwrap(); *server_processes = new_server_processes; } } @@ -607,73 +526,6 @@ impl Env { } } - /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` - fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String { - let mut tt = TinyTemplate::new(); - - let mut path = util::sqlness_conf_path(); - path.push(format!("{subcommand}-test.toml.template")); - let template = std::fs::read_to_string(path).unwrap(); - tt.add_template(subcommand, &template).unwrap(); - - #[derive(Serialize)] - struct Context { - wal_dir: String, - data_home: String, - procedure_dir: String, - is_raft_engine: bool, - kafka_wal_broker_endpoints: String, - use_etcd: bool, - store_addrs: String, - } - - let data_home = self.sqlness_home.join(format!("greptimedb-{subcommand}")); - std::fs::create_dir_all(data_home.as_path()).unwrap(); - - let wal_dir = data_home.join("wal").display().to_string(); - let procedure_dir = data_home.join("procedure").display().to_string(); - let ctx = Context { - wal_dir, - data_home: data_home.display().to_string(), - procedure_dir, - is_raft_engine: db_ctx.is_raft_engine(), - kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(), - use_etcd: !self.store_config.store_addrs.is_empty(), - store_addrs: self - .store_config - .store_addrs - .clone() - .iter() - .map(|p| format!("\"{p}\"")) - .collect::>() - .join(","), - }; - let rendered = tt.render(subcommand, &ctx).unwrap(); - - let conf_file = data_home - .join(format!("{subcommand}-{}.toml", db_ctx.time)) - .display() - .to_string(); - println!("Generating {subcommand} config file in {conf_file}, full content:\n{rendered}"); - std::fs::write(&conf_file, rendered).unwrap(); - - conf_file - } - - fn pg_server_addr(&self) -> String { - self.server_addrs - .pg_server_addr - .clone() - .unwrap_or(POSTGRES_SERVER_ADDR.to_owned()) - } - - fn mysql_server_addr(&self) -> String { - self.server_addrs - .mysql_server_addr - .clone() - .unwrap_or(MYSQL_SERVER_ADDR.to_owned()) - } - /// Build the DB with `cargo build --bin greptime` fn build_db(&self) { if self.bins_dir.lock().unwrap().is_some() { @@ -714,6 +566,7 @@ pub struct GreptimeDB { ctx: GreptimeDBContext, is_standalone: bool, env: Env, + id: usize, } impl GreptimeDB { @@ -914,12 +767,13 @@ impl Drop for GreptimeDB { } } -struct GreptimeDBContext { +pub struct GreptimeDBContext { /// Start time in millisecond time: i64, datanode_id: AtomicU32, wal: WalConfig, store_config: StoreConfig, + server_modes: Vec, } impl GreptimeDBContext { @@ -929,14 +783,19 @@ impl GreptimeDBContext { datanode_id: AtomicU32::new(0), wal, store_config, + server_modes: Vec::new(), } } - fn is_raft_engine(&self) -> bool { + pub(crate) fn time(&self) -> i64 { + self.time + } + + pub fn is_raft_engine(&self) -> bool { matches!(self.wal, WalConfig::RaftEngine) } - fn kafka_wal_broker_endpoints(&self) -> String { + pub fn kafka_wal_broker_endpoints(&self) -> String { match &self.wal { WalConfig::RaftEngine => String::new(), WalConfig::Kafka { @@ -949,17 +808,24 @@ impl GreptimeDBContext { let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed); } - fn datanode_id(&self) -> u32 { - self.datanode_id.load(Ordering::Relaxed) - } - fn reset_datanode_id(&self) { self.datanode_id.store(0, Ordering::Relaxed); } - fn store_config(&self) -> StoreConfig { + pub(crate) fn store_config(&self) -> StoreConfig { self.store_config.clone() } + + fn set_server_mode(&mut self, mode: ServerMode, idx: usize) { + if idx >= self.server_modes.len() { + self.server_modes.resize(idx + 1, mode.clone()); + } + self.server_modes[idx] = mode; + } + + fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> { + self.server_modes.get(idx) + } } struct ResultDisplayer { diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index d9b43ef401..b8ce851f1f 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -26,6 +26,7 @@ use crate::env::StoreConfig; mod env; mod protocol_interceptor; +mod server_mode; mod util; #[derive(ValueEnum, Debug, Clone)] @@ -110,11 +111,15 @@ struct Args { /// Whether to setup pg, by default it is false. #[clap(long, default_value = "false")] setup_pg: bool, + + /// The number of jobs to run in parallel. Default to half of the cores. + #[clap(short, long, default_value = "0")] + jobs: usize, } #[tokio::main] async fn main() { - let args = Args::parse(); + let mut args = Args::parse(); let temp_dir = tempfile::Builder::new() .prefix("sqlness") @@ -133,6 +138,20 @@ async fn main() { panic!("{} is not a directory", d.display()); } } + if args.jobs == 0 { + args.jobs = num_cpus::get() / 2; + } + + // normalize parallelism to 1 if any of the following conditions are met: + if args.server_addr.server_addr.is_some() + || args.setup_etcd + || args.setup_pg + || args.kafka_wal_broker_endpoints.is_some() + { + args.jobs = 1; + println!("Normalizing parallelism to 1 due to server addresses or etcd/pg setup"); + } + let config = ConfigBuilder::default() .case_dir(util::get_case_dir(args.case_dir)) .fail_fast(args.fail_fast) @@ -140,6 +159,7 @@ async fn main() { .follow_links(true) .env_config_file(args.env_config_file) .interceptor_registry(interceptor_registry) + .parallelism(args.jobs) .build() .unwrap(); diff --git a/tests/runner/src/server_mode.rs b/tests/runner/src/server_mode.rs new file mode 100644 index 0000000000..b59156d33c --- /dev/null +++ b/tests/runner/src/server_mode.rs @@ -0,0 +1,464 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::Path; + +use serde::Serialize; +use tinytemplate::TinyTemplate; + +use crate::env::{Env, GreptimeDBContext}; +use crate::{util, ServerAddr}; + +const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; + +#[derive(Clone)] +pub enum ServerMode { + Standalone { + http_addr: String, + rpc_bind_addr: String, + mysql_addr: String, + postgres_addr: String, + }, + Frontend { + http_addr: String, + rpc_bind_addr: String, + mysql_addr: String, + postgres_addr: String, + metasrv_addr: String, + }, + Metasrv { + rpc_bind_addr: String, + rpc_server_addr: String, + http_addr: String, + }, + Datanode { + rpc_bind_addr: String, + rpc_server_addr: String, + http_addr: String, + metasrv_addr: String, + node_id: u32, + }, + Flownode { + rpc_bind_addr: String, + rpc_server_addr: String, + http_addr: String, + metasrv_addr: String, + node_id: u32, + }, +} + +#[derive(Serialize)] +struct ConfigContext { + wal_dir: String, + data_home: String, + procedure_dir: String, + is_raft_engine: bool, + kafka_wal_broker_endpoints: String, + use_etcd: bool, + store_addrs: String, + instance_id: usize, + // for following addrs, leave it empty if not needed + // required for datanode + metasrv_addr: String, + // for frontend and standalone + grpc_addr: String, + // for standalone + mysql_addr: String, + // for standalone + postgres_addr: String, +} + +impl ServerMode { + pub fn random_standalone() -> Self { + let http_port = util::get_random_port(); + let rpc_port = util::get_random_port(); + let mysql_port = util::get_random_port(); + let postgres_port = util::get_random_port(); + + ServerMode::Standalone { + http_addr: format!("127.0.0.1:{http_port}"), + rpc_bind_addr: format!("127.0.0.1:{rpc_port}"), + mysql_addr: format!("127.0.0.1:{mysql_port}"), + postgres_addr: format!("127.0.0.1:{postgres_port}"), + } + } + + pub fn random_frontend(metasrv_port: u16) -> Self { + let http_port = util::get_random_port(); + let rpc_port = util::get_random_port(); + let mysql_port = util::get_random_port(); + let postgres_port = util::get_random_port(); + + ServerMode::Frontend { + http_addr: format!("127.0.0.1:{http_port}"), + rpc_bind_addr: format!("127.0.0.1:{rpc_port}"), + mysql_addr: format!("127.0.0.1:{mysql_port}"), + postgres_addr: format!("127.0.0.1:{postgres_port}"), + metasrv_addr: format!("127.0.0.1:{metasrv_port}"), + } + } + + pub fn random_metasrv() -> Self { + let bind_port = util::get_random_port(); + let http_port = util::get_random_port(); + + ServerMode::Metasrv { + rpc_bind_addr: format!("127.0.0.1:{bind_port}"), + rpc_server_addr: format!("127.0.0.1:{bind_port}"), + http_addr: format!("127.0.0.1:{http_port}"), + } + } + + pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self { + let rpc_port = util::get_random_port(); + let http_port = util::get_random_port(); + + ServerMode::Datanode { + rpc_bind_addr: format!("127.0.0.1:{rpc_port}"), + rpc_server_addr: format!("127.0.0.1:{rpc_port}"), + http_addr: format!("127.0.0.1:{http_port}"), + metasrv_addr: format!("127.0.0.1:{metasrv_port}"), + node_id, + } + } + + pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self { + let rpc_port = util::get_random_port(); + let http_port = util::get_random_port(); + + ServerMode::Flownode { + rpc_bind_addr: format!("127.0.0.1:{rpc_port}"), + rpc_server_addr: format!("127.0.0.1:{rpc_port}"), + http_addr: format!("127.0.0.1:{http_port}"), + metasrv_addr: format!("127.0.0.1:{metasrv_port}"), + node_id, + } + } + + pub fn name(&self) -> &'static str { + match self { + ServerMode::Standalone { .. } => "standalone", + ServerMode::Frontend { .. } => "frontend", + ServerMode::Metasrv { .. } => "metasrv", + ServerMode::Datanode { .. } => "datanode", + ServerMode::Flownode { .. } => "flownode", + } + } + + /// Returns the addresses of the server that needed to be checked. + pub fn check_addrs(&self) -> Vec { + match self { + ServerMode::Standalone { + rpc_bind_addr, + mysql_addr, + postgres_addr, + http_addr, + .. + } => { + vec![ + rpc_bind_addr.clone(), + mysql_addr.clone(), + postgres_addr.clone(), + http_addr.clone(), + ] + } + ServerMode::Frontend { + rpc_bind_addr, + mysql_addr, + postgres_addr, + .. + } => { + vec![ + rpc_bind_addr.clone(), + mysql_addr.clone(), + postgres_addr.clone(), + ] + } + ServerMode::Metasrv { rpc_bind_addr, .. } => { + vec![rpc_bind_addr.clone()] + } + ServerMode::Datanode { rpc_bind_addr, .. } => { + vec![rpc_bind_addr.clone()] + } + ServerMode::Flownode { rpc_bind_addr, .. } => { + vec![rpc_bind_addr.clone()] + } + } + } + + /// Returns the server addresses to connect. Only standalone and frontend mode have this. + pub fn server_addr(&self) -> Option { + match self { + ServerMode::Standalone { + rpc_bind_addr, + mysql_addr, + postgres_addr, + .. + } => Some(ServerAddr { + server_addr: Some(rpc_bind_addr.clone()), + pg_server_addr: Some(postgres_addr.clone()), + mysql_server_addr: Some(mysql_addr.clone()), + }), + ServerMode::Frontend { + rpc_bind_addr, + mysql_addr, + postgres_addr, + .. + } => Some(ServerAddr { + server_addr: Some(rpc_bind_addr.clone()), + pg_server_addr: Some(postgres_addr.clone()), + mysql_server_addr: Some(mysql_addr.clone()), + }), + _ => None, + } + } + + pub fn generate_config_file( + &self, + sqlness_home: &Path, + db_ctx: &GreptimeDBContext, + id: usize, + ) -> String { + let mut tt = TinyTemplate::new(); + + let mut path = util::sqlness_conf_path(); + path.push(format!("{}-test.toml.template", self.name())); + let template = std::fs::read_to_string(path).unwrap(); + tt.add_template(self.name(), &template).unwrap(); + + let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name())); + std::fs::create_dir_all(data_home.as_path()).unwrap(); + + let wal_dir = data_home.join("wal").display().to_string(); + let procedure_dir = data_home.join("procedure").display().to_string(); + + // Get the required addresses based on server mode + let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self { + ServerMode::Standalone { + rpc_bind_addr, + mysql_addr, + postgres_addr, + .. + } => ( + String::new(), + rpc_bind_addr.clone(), + mysql_addr.clone(), + postgres_addr.clone(), + ), + ServerMode::Frontend { + rpc_bind_addr, + mysql_addr, + postgres_addr, + .. + } => ( + String::new(), + rpc_bind_addr.clone(), + mysql_addr.clone(), + postgres_addr.clone(), + ), + ServerMode::Datanode { + rpc_bind_addr, + metasrv_addr, + .. + } => ( + metasrv_addr.clone(), + rpc_bind_addr.clone(), + String::new(), + String::new(), + ), + _ => (String::new(), String::new(), String::new(), String::new()), + }; + + let ctx = ConfigContext { + wal_dir, + data_home: data_home.display().to_string(), + procedure_dir, + is_raft_engine: db_ctx.is_raft_engine(), + kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(), + use_etcd: !db_ctx.store_config().store_addrs.is_empty(), + store_addrs: db_ctx + .store_config() + .store_addrs + .iter() + .map(|p| format!("\"{p}\"")) + .collect::>() + .join(","), + instance_id: id, + metasrv_addr, + grpc_addr, + mysql_addr, + postgres_addr, + }; + + let rendered = tt.render(self.name(), &ctx).unwrap(); + + let conf_file = data_home + .join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time())) + .display() + .to_string(); + println!( + "Generating id {}, {} config file in {conf_file}, full content:\n{rendered}", + id, + self.name() + ); + std::fs::write(&conf_file, rendered).unwrap(); + + conf_file + } + + pub fn get_args( + &self, + sqlness_home: &Path, + _env: &Env, + db_ctx: &GreptimeDBContext, + id: usize, + ) -> Vec { + let mut args = vec![ + DEFAULT_LOG_LEVEL.to_string(), + self.name().to_string(), + "start".to_string(), + ]; + + match self { + ServerMode::Standalone { + http_addr, + rpc_bind_addr, + mysql_addr, + postgres_addr, + } => { + args.extend([ + format!( + "--log-dir={}/greptimedb-{}-standalone/logs", + sqlness_home.display(), + id + ), + "-c".to_string(), + self.generate_config_file(sqlness_home, db_ctx, id), + format!("--http-addr={http_addr}"), + format!("--rpc-addr={rpc_bind_addr}"), + format!("--mysql-addr={mysql_addr}"), + format!("--postgres-addr={postgres_addr}"), + ]); + } + ServerMode::Frontend { + http_addr, + rpc_bind_addr, + mysql_addr, + postgres_addr, + metasrv_addr, + } => { + args.extend([ + format!("--metasrv-addrs={metasrv_addr}"), + format!("--http-addr={http_addr}"), + format!("--rpc-addr={rpc_bind_addr}"), + format!("--mysql-addr={mysql_addr}"), + format!("--postgres-addr={postgres_addr}"), + format!( + "--log-dir={}/greptimedb-{}-frontend/logs", + sqlness_home.display(), + id + ), + "-c".to_string(), + self.generate_config_file(sqlness_home, db_ctx, id), + ]); + } + ServerMode::Metasrv { + rpc_bind_addr, + rpc_server_addr, + http_addr, + } => { + args.extend([ + "--bind-addr".to_string(), + rpc_bind_addr.clone(), + "--server-addr".to_string(), + rpc_server_addr.clone(), + "--enable-region-failover".to_string(), + "false".to_string(), + format!("--http-addr={http_addr}"), + format!( + "--log-dir={}/greptimedb-{}-metasrv/logs", + sqlness_home.display(), + id + ), + "-c".to_string(), + self.generate_config_file(sqlness_home, db_ctx, id), + ]); + + if db_ctx.store_config().setup_pg { + let client_ports = db_ctx + .store_config() + .store_addrs + .iter() + .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) + .collect::>(); + let client_port = client_ports.first().unwrap_or(&5432); + let pg_server_addr = format!( + "postgresql://greptimedb:admin@127.0.0.1:{}/postgres", + client_port + ); + args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]); + args.extend(vec!["--store-addrs".to_string(), pg_server_addr]); + } else if db_ctx.store_config().store_addrs.is_empty() { + args.extend(vec!["--backend".to_string(), "memory-store".to_string()]) + } + } + ServerMode::Datanode { + rpc_bind_addr, + rpc_server_addr, + http_addr, + metasrv_addr, + node_id, + } => { + let data_home = sqlness_home.join(format!( + "greptimedb_{}_datanode_{}_{node_id}", + id, + db_ctx.time() + )); + args.extend([ + format!("--rpc-addr={rpc_bind_addr}"), + format!("--rpc-server-addr={rpc_server_addr}"), + format!("--http-addr={http_addr}"), + format!("--data-home={}", data_home.display()), + format!("--log-dir={}/logs", data_home.display()), + format!("--node-id={node_id}"), + "-c".to_string(), + self.generate_config_file(sqlness_home, db_ctx, id), + format!("--metasrv-addrs={metasrv_addr}"), + ]); + } + ServerMode::Flownode { + rpc_bind_addr, + rpc_server_addr, + http_addr, + metasrv_addr, + node_id, + } => { + args.extend([ + format!("--rpc-addr={rpc_bind_addr}"), + format!("--rpc-server-addr={rpc_server_addr}"), + format!("--node-id={node_id}"), + format!( + "--log-dir={}/greptimedb-{}-flownode/logs", + sqlness_home.display(), + id + ), + format!("--metasrv-addrs={metasrv_addr}"), + format!("--http-addr={http_addr}"), + ]); + } + } + + args + } +} diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index 5baa0cd80e..b7c688bec8 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -465,3 +465,13 @@ pub fn teardown_wal() { println!("kafka cluster is down"); } + +/// Get a random available port by binding to port 0 +pub fn get_random_port() -> u16 { + use std::net::TcpListener; + let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port"); + listener + .local_addr() + .expect("Failed to get local address") + .port() +}