feat: run sqlness in parallel (#5499)

* define server mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* bump sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* all good

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: Move config generation logic from Env to ServerMode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finalize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change license header

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* override parallelism

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename more variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-20 23:05:19 -08:00
committed by GitHub
parent 6f0783e17e
commit d7b6718be0
14 changed files with 683 additions and 298 deletions

5
Cargo.lock generated
View File

@@ -10895,12 +10895,12 @@ dependencies = [
[[package]]
name = "sqlness"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308a7338f2211813d6e9da117e9b9b7aee5d072872d11a934002fd2bd4ab5276"
source = "git+https://github.com/CeresDB/sqlness.git?rev=bb91f31ff58993e07ea89845791235138283a24c#bb91f31ff58993e07ea89845791235138283a24c"
dependencies = [
"async-trait",
"derive_builder 0.11.2",
"duration-str",
"futures",
"minijinja",
"prettydiff",
"regex",
@@ -10926,6 +10926,7 @@ dependencies = [
"hex",
"local-ip-address",
"mysql",
"num_cpus",
"reqwest",
"serde",
"serde_json",

View File

@@ -49,3 +49,7 @@ EXPLAIN ANALYZE SELECT count(*) FROM test_table;
|_|_| Total rows: 1_|
+-+-+-+
DROP TABLE test_table;
Affected Rows: 0

View File

@@ -28,3 +28,5 @@ SELECT count(*) FROM test_table;
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT count(*) FROM test_table;
DROP TABLE test_table;

View File

@@ -22,50 +22,55 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:29411|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:29412|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:29413|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|127.0.0.1:29680|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:29401|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:29302|Version|Hash|Start_time|Duration||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||1|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||2|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|Address|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|Address|Version|Hash|Start_time|Duration|Duration||1|METASRV|Address|Version|Hash|Start_time|Duration||+++++++++
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:29302|Version|Hash|Start_time|Duration||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|Address|Version|Hash|Start_time|Duration||+++++++++
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|127.0.0.1:29401|Version|Hash|Start_time|Duration|Duration|+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|FRONTEND|Address|Version|Hash|Start_time|Duration|Duration|+++++++++
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:29411|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:29412|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:29413|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|127.0.0.1:29680|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:29302|Version|Hash|Start_time|Duration||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|0|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||1|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||2|DATANODE|Address|Version|Hash|Start_time|Duration|Duration||0|FLOWNODE|Address|Version|Hash|Start_time|Duration|Duration||1|METASRV|Address|Version|Hash|Start_time|Duration||+++++++++
-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d+\.\d+\.\d+\s) Version
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type;
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|2|DATANODE|127.0.0.1:29412|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:29413|Version|Hash|Start_time|Duration|Duration|+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|2|DATANODE|Address|Version|Hash|Start_time|Duration|Duration|+++++++++
USE PUBLIC;

View File

@@ -7,6 +7,7 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
@@ -15,6 +16,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
@@ -23,6 +25,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
@@ -31,6 +34,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
@@ -39,6 +43,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE (\s[a-z0-9]{7,8}\s) Hash
-- SQLNESS REPLACE (\s[\-0-9T:\.]{19,}) Start_time
-- SQLNESS REPLACE ((\d+(s|ms|m)\s)+) Duration
-- SQLNESS REPLACE (\s127\.0\.0\.1:\d+\s) Address
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_ID > 1 ORDER BY peer_type;

View File

@@ -372,3 +372,7 @@ ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant';
Error: 1004(InvalidArguments), Invalid set database option, key: ttl, value: instant
DROP DATABASE test_ttl_db_2;
Affected Rows: 0

View File

@@ -80,3 +80,5 @@ CREATE DATABASE test_ttl_db WITH (ttl = 'instant');
CREATE DATABASE test_ttl_db_2 WITH (ttl = '1s');
ALTER DATABASE test_ttl_db_2 SET 'ttl' = 'instant';
DROP DATABASE test_ttl_db_2;

View File

@@ -22,7 +22,7 @@ type = 'File'
data_home = '{data_home}'
[meta_client_options]
metasrv_addrs = ['127.0.0.1:29302']
metasrv_addrs = ['{metasrv_addr}']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false

View File

@@ -21,12 +21,12 @@ type = 'File'
data_home = '{data_home}'
[grpc]
bind_addr = '127.0.0.1:29401'
bind_addr = '{grpc_addr}'
runtime_size = 8
[mysql]
enable = true
addr = "127.0.0.1:29402"
addr = "{mysql_addr}"
runtime_size = 2
[mysql.tls]
@@ -34,7 +34,7 @@ mode = "disable"
[postgres]
enable = true
addr = "127.0.0.1:29403"
addr = "{postgres_addr}"
runtime_size = 2
[procedure]

View File

@@ -20,11 +20,13 @@ flate2 = "1.0"
hex = "0.4"
local-ip-address = "0.6"
mysql = { version = "25.0.1", default-features = false, features = ["minimal", "rustls-tls"] }
num_cpus = "1.16"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
serde.workspace = true
serde_json.workspace = true
sha2 = "0.10"
sqlness = "0.6.1" # sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1
# wait for https://github.com/CeresDB/sqlness/pull/71
sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "bb91f31ff58993e07ea89845791235138283a24c" }
tar = "0.4"
tempfile.workspace = true
tinytemplate = "1.2"

View File

@@ -39,21 +39,22 @@ use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use mysql::prelude::Queryable;
use mysql::{Conn as MySqlClient, Row as MySqlRow};
use serde::Serialize;
use sqlness::{Database, EnvController, QueryContext};
use tinytemplate::TinyTemplate;
use tokio::sync::Mutex as TokioMutex;
use tokio_postgres::{Client as PgClient, SimpleQueryMessage as PgRow};
use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
use crate::server_mode::ServerMode;
use crate::util::{get_workspace_root, maybe_pull_binary, PROGRAM};
use crate::{util, ServerAddr};
const METASRV_ADDR: &str = "127.0.0.1:29302";
const GRPC_SERVER_ADDR: &str = "127.0.0.1:29401";
const MYSQL_SERVER_ADDR: &str = "127.0.0.1:29402";
const POSTGRES_SERVER_ADDR: &str = "127.0.0.1:29403";
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
// standalone mode
const SERVER_MODE_STANDALONE_IDX: usize = 0;
// distributed mode
const SERVER_MODE_METASRV_IDX: usize = 0;
const SERVER_MODE_DATANODE_START_IDX: usize = 1;
const SERVER_MODE_FRONTEND_IDX: usize = 4;
const SERVER_MODE_FLOWNODE_IDX: usize = 5;
#[derive(Clone)]
pub enum WalConfig {
@@ -95,11 +96,15 @@ pub struct Env {
impl EnvController for Env {
type DB = GreptimeDB;
async fn start(&self, mode: &str, _config: Option<&Path>) -> Self::DB {
async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
if self.server_addrs.server_addr.is_some() && id > 0 {
panic!("Parallel test mode is not supported when server address is already set.");
}
std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
match mode {
"standalone" => self.start_standalone().await,
"distributed" => self.start_distributed().await,
"standalone" => self.start_standalone(id).await,
"distributed" => self.start_distributed(id).await,
_ => panic!("Unexpected mode: {mode}"),
}
}
@@ -133,18 +138,23 @@ impl Env {
}
}
async fn start_standalone(&self) -> GreptimeDB {
async fn start_standalone(&self, id: usize) -> GreptimeDB {
println!("Starting standalone instance {id}");
if self.server_addrs.server_addr.is_some() {
self.connect_db(&self.server_addrs).await
self.connect_db(&self.server_addrs, id).await
} else {
self.build_db();
self.setup_wal();
let db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
let server_process = self.start_server("standalone", &db_ctx, true).await;
let server_mode = ServerMode::random_standalone();
db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
let server_addr = server_mode.server_addr().unwrap();
let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
let mut greptimedb = self.connect_db(&Default::default()).await;
let mut greptimedb = self.connect_db(&server_addr, id).await;
greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
greptimedb.is_standalone = true;
greptimedb.ctx = db_ctx;
@@ -153,29 +163,51 @@ impl Env {
}
}
async fn start_distributed(&self) -> GreptimeDB {
async fn start_distributed(&self, id: usize) -> GreptimeDB {
if self.server_addrs.server_addr.is_some() {
self.connect_db(&self.server_addrs).await
self.connect_db(&self.server_addrs, id).await
} else {
self.build_db();
self.setup_wal();
self.setup_etcd();
self.setup_pg();
let db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
// start a distributed GreptimeDB
let meta_server = self.start_server("metasrv", &db_ctx, true).await;
let meta_server_mode = ServerMode::random_metasrv();
let metasrv_port = match &meta_server_mode {
ServerMode::Metasrv { rpc_server_addr, .. } => rpc_server_addr
.split(':')
.nth(1)
.unwrap()
.parse::<u16>()
.unwrap(),
_ => panic!("metasrv mode not set, maybe running in remote mode which doesn't support restart?"),
};
db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
let datanode_1 = self.start_server("datanode", &db_ctx, true).await;
let datanode_2 = self.start_server("datanode", &db_ctx, true).await;
let datanode_3 = self.start_server("datanode", &db_ctx, true).await;
let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
let frontend = self.start_server("frontend", &db_ctx, true).await;
let frontend_mode = ServerMode::random_frontend(metasrv_port);
let server_addr = frontend_mode.server_addr().unwrap();
db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
let flownode = self.start_server("flownode", &db_ctx, true).await;
let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
let mut greptimedb = self.connect_db(&Default::default()).await;
let mut greptimedb = self.connect_db(&server_addr, id).await;
greptimedb.metasrv_process = Some(meta_server).into();
greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
@@ -237,24 +269,14 @@ impl Env {
panic!("Failed to connect to MySQL server. Please check if the server is running.")
}
async fn connect_db(&self, server_addr: &ServerAddr) -> GreptimeDB {
let grpc_server_addr = server_addr
.server_addr
.clone()
.unwrap_or(GRPC_SERVER_ADDR.to_owned());
let pg_server_addr = server_addr
.pg_server_addr
.clone()
.unwrap_or(POSTGRES_SERVER_ADDR.to_owned());
let mysql_server_addr = server_addr
.mysql_server_addr
.clone()
.unwrap_or(MYSQL_SERVER_ADDR.to_owned());
async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
let grpc_server_addr = server_addr.server_addr.clone().unwrap();
let pg_server_addr = server_addr.pg_server_addr.clone().unwrap();
let mysql_server_addr = server_addr.mysql_server_addr.clone().unwrap();
let grpc_client = Client::with_urls(vec![grpc_server_addr.clone()]);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
let pg_client = self.create_pg_client(&pg_server_addr).await;
let mysql_client = self.create_mysql_client(&mysql_server_addr).await;
GreptimeDB {
@@ -270,9 +292,11 @@ impl Env {
datanode_id: Default::default(),
wal: self.wal.clone(),
store_config: self.store_config.clone(),
server_modes: Vec::new(),
},
is_standalone: false,
env: self.clone(),
id,
}
}
@@ -283,25 +307,24 @@ impl Env {
async fn start_server(
&self,
subcommand: &str,
mode: ServerMode,
db_ctx: &GreptimeDBContext,
id: usize,
truncate_log: bool,
) -> Child {
let log_file_name = match subcommand {
"datanode" => {
let log_file_name = match mode {
ServerMode::Datanode { node_id, .. } => {
db_ctx.incr_datanode_id();
format!("greptime-sqlness-datanode-{}.log", db_ctx.datanode_id())
format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
}
// The flownode id is always 0 for now
"flownode" => "greptime-sqlness-flownode.log".to_string(),
"frontend" => "greptime-sqlness-frontend.log".to_string(),
"metasrv" => "greptime-sqlness-metasrv.log".to_string(),
"standalone" => "greptime-sqlness-standalone.log".to_string(),
_ => panic!("Unexpected subcommand: {subcommand}"),
ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
};
let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
println!("{subcommand} log file at {stdout_file_name}");
println!("DB instance {id} log file at {stdout_file_name}");
let stdout_file = OpenOptions::new()
.create(true)
@@ -311,103 +334,8 @@ impl Env {
.open(stdout_file_name)
.unwrap();
let (args, check_ip_addrs) = match subcommand {
"datanode" => {
let (args, addr) = self.datanode_start_args(db_ctx);
(args, vec![addr])
}
"flownode" => {
let (args, addr) = self.flownode_start_args(db_ctx, &self.sqlness_home);
(args, vec![addr])
}
"standalone" => {
let args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
format!(
"--log-dir={}/greptimedb-flownode/logs",
self.sqlness_home.display()
),
"-c".to_string(),
self.generate_config_file(subcommand, db_ctx),
"--http-addr=127.0.0.1:29502".to_string(),
];
(
args,
vec![
GRPC_SERVER_ADDR.to_string(),
MYSQL_SERVER_ADDR.to_string(),
POSTGRES_SERVER_ADDR.to_string(),
],
)
}
"frontend" => {
let args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
"--metasrv-addrs=127.0.0.1:29302".to_string(),
"--http-addr=127.0.0.1:29503".to_string(),
format!("--rpc-bind-addr={}", GRPC_SERVER_ADDR),
format!("--mysql-addr={}", MYSQL_SERVER_ADDR),
format!("--postgres-addr={}", POSTGRES_SERVER_ADDR),
format!(
"--log-dir={}/greptimedb-frontend/logs",
self.sqlness_home.display()
),
"-c".to_string(),
self.generate_config_file(subcommand, db_ctx),
];
(
args,
vec![
GRPC_SERVER_ADDR.to_string(),
MYSQL_SERVER_ADDR.to_string(),
POSTGRES_SERVER_ADDR.to_string(),
],
)
}
"metasrv" => {
let mut args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
"--rpc-bind-addr".to_string(),
"127.0.0.1:29302".to_string(),
"--rpc-server-addr".to_string(),
"127.0.0.1:29302".to_string(),
"--enable-region-failover".to_string(),
"false".to_string(),
"--http-addr=127.0.0.1:29502".to_string(),
format!(
"--log-dir={}/greptimedb-metasrv/logs",
self.sqlness_home.display()
),
"-c".to_string(),
self.generate_config_file(subcommand, db_ctx),
];
if db_ctx.store_config().setup_pg {
let client_ports = self
.store_config
.store_addrs
.iter()
.map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
.collect::<Vec<_>>();
let client_port = client_ports.first().unwrap_or(&5432);
let pg_server_addr = format!(
"postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
client_port
);
args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
} else if db_ctx.store_config().store_addrs.is_empty() {
args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
}
(args, vec![METASRV_ADDR.to_string()])
}
_ => panic!("Unexpected subcommand: {subcommand}"),
};
let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
let check_ip_addrs = mode.check_addrs();
for check_ip_addr in &check_ip_addrs {
if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
@@ -431,69 +359,23 @@ impl Env {
.stdout(stdout_file)
.spawn()
.unwrap_or_else(|error| {
panic!("Failed to start the DB with subcommand {subcommand},Error: {error}, path: {:?}", bins_dir.join(program));
panic!(
"Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
mode.name(),
bins_dir.join(program)
);
});
for check_ip_addr in &check_ip_addrs {
if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(10)).await {
Env::stop_server(&mut process);
panic!("{subcommand} doesn't up in 10 seconds, quit.")
panic!("{} doesn't up in 10 seconds, quit.", mode.name())
}
}
process
}
fn datanode_start_args(&self, db_ctx: &GreptimeDBContext) -> (Vec<String>, String) {
let id = db_ctx.datanode_id();
let data_home = self
.sqlness_home
.join(format!("greptimedb_datanode_{}_{id}", db_ctx.time));
let subcommand = "datanode";
let mut args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
];
args.push(format!("--rpc-bind-addr=127.0.0.1:2941{id}"));
args.push(format!("--rpc-server-addr=127.0.0.1:2941{id}"));
args.push(format!("--http-addr=127.0.0.1:2943{id}"));
args.push(format!("--data-home={}", data_home.display()));
args.push(format!("--log-dir={}/logs", data_home.display()));
args.push(format!("--node-id={id}"));
args.push("-c".to_string());
args.push(self.generate_config_file(subcommand, db_ctx));
args.push("--metasrv-addrs=127.0.0.1:29302".to_string());
(args, format!("127.0.0.1:2941{id}"))
}
fn flownode_start_args(
&self,
_db_ctx: &GreptimeDBContext,
sqlness_home: &Path,
) -> (Vec<String>, String) {
let id = 0;
let subcommand = "flownode";
let mut args = vec![
DEFAULT_LOG_LEVEL.to_string(),
subcommand.to_string(),
"start".to_string(),
];
args.push(format!("--rpc-bind-addr=127.0.0.1:2968{id}"));
args.push(format!("--rpc-server-addr=127.0.0.1:2968{id}"));
args.push(format!("--node-id={id}"));
args.push(format!(
"--log-dir={}/greptimedb-flownode/logs",
sqlness_home.display()
));
args.push("--metasrv-addrs=127.0.0.1:29302".to_string());
args.push(format!("--http-addr=127.0.0.1:2951{id}"));
(args, format!("127.0.0.1:2968{id}"))
}
/// stop and restart the server process
async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
{
@@ -526,12 +408,30 @@ impl Env {
// check if the server is distributed or standalone
let new_server_processes = if db.is_standalone {
let new_server_process = self.start_server("standalone", &db.ctx, false).await;
let server_mode = db
.ctx
.get_server_mode(SERVER_MODE_STANDALONE_IDX)
.cloned()
.unwrap();
let server_addr = server_mode.server_addr().unwrap();
let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
*db.pg_client.lock().await = self
.create_pg_client(&server_addr.pg_server_addr.unwrap())
.await;
*db.mysql_client.lock().await = self
.create_mysql_client(&server_addr.mysql_server_addr.unwrap())
.await;
vec![new_server_process]
} else {
db.ctx.reset_datanode_id();
if is_full_restart {
let metasrv = self.start_server("metasrv", &db.ctx, false).await;
let metasrv_mode = db
.ctx
.get_server_mode(SERVER_MODE_METASRV_IDX)
.cloned()
.unwrap();
let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
db.metasrv_process
.lock()
.expect("lock poisoned")
@@ -543,19 +443,41 @@ impl Env {
}
let mut processes = vec![];
for _ in 0..3 {
let new_server_process = self.start_server("datanode", &db.ctx, false).await;
for i in 0..3 {
let datanode_mode = db
.ctx
.get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
.cloned()
.unwrap();
let new_server_process = self
.start_server(datanode_mode, &db.ctx, db.id, false)
.await;
processes.push(new_server_process);
}
if is_full_restart {
let frontend = self.start_server("frontend", &db.ctx, false).await;
let frontend_mode = db
.ctx
.get_server_mode(SERVER_MODE_FRONTEND_IDX)
.cloned()
.unwrap();
let frontend = self
.start_server(frontend_mode, &db.ctx, db.id, false)
.await;
db.frontend_process
.lock()
.expect("lock poisoned")
.replace(frontend);
}
let flownode = self.start_server("flownode", &db.ctx, false).await;
let flownode_mode = db
.ctx
.get_server_mode(SERVER_MODE_FLOWNODE_IDX)
.cloned()
.unwrap();
let flownode = self
.start_server(flownode_mode, &db.ctx, db.id, false)
.await;
db.flownode_process
.lock()
.expect("lock poisoned")
@@ -564,11 +486,8 @@ impl Env {
processes
};
if let Some(server_process) = db.server_processes.clone() {
*db.pg_client.lock().await = self.create_pg_client(&self.pg_server_addr()).await;
*db.mysql_client.lock().await =
self.create_mysql_client(&self.mysql_server_addr()).await;
let mut server_processes = server_process.lock().unwrap();
if let Some(server_processes) = db.server_processes.clone() {
let mut server_processes = server_processes.lock().unwrap();
*server_processes = new_server_processes;
}
}
@@ -607,73 +526,6 @@ impl Env {
}
}
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String {
let mut tt = TinyTemplate::new();
let mut path = util::sqlness_conf_path();
path.push(format!("{subcommand}-test.toml.template"));
let template = std::fs::read_to_string(path).unwrap();
tt.add_template(subcommand, &template).unwrap();
#[derive(Serialize)]
struct Context {
wal_dir: String,
data_home: String,
procedure_dir: String,
is_raft_engine: bool,
kafka_wal_broker_endpoints: String,
use_etcd: bool,
store_addrs: String,
}
let data_home = self.sqlness_home.join(format!("greptimedb-{subcommand}"));
std::fs::create_dir_all(data_home.as_path()).unwrap();
let wal_dir = data_home.join("wal").display().to_string();
let procedure_dir = data_home.join("procedure").display().to_string();
let ctx = Context {
wal_dir,
data_home: data_home.display().to_string(),
procedure_dir,
is_raft_engine: db_ctx.is_raft_engine(),
kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
use_etcd: !self.store_config.store_addrs.is_empty(),
store_addrs: self
.store_config
.store_addrs
.clone()
.iter()
.map(|p| format!("\"{p}\""))
.collect::<Vec<_>>()
.join(","),
};
let rendered = tt.render(subcommand, &ctx).unwrap();
let conf_file = data_home
.join(format!("{subcommand}-{}.toml", db_ctx.time))
.display()
.to_string();
println!("Generating {subcommand} config file in {conf_file}, full content:\n{rendered}");
std::fs::write(&conf_file, rendered).unwrap();
conf_file
}
fn pg_server_addr(&self) -> String {
self.server_addrs
.pg_server_addr
.clone()
.unwrap_or(POSTGRES_SERVER_ADDR.to_owned())
}
fn mysql_server_addr(&self) -> String {
self.server_addrs
.mysql_server_addr
.clone()
.unwrap_or(MYSQL_SERVER_ADDR.to_owned())
}
/// Build the DB with `cargo build --bin greptime`
fn build_db(&self) {
if self.bins_dir.lock().unwrap().is_some() {
@@ -714,6 +566,7 @@ pub struct GreptimeDB {
ctx: GreptimeDBContext,
is_standalone: bool,
env: Env,
id: usize,
}
impl GreptimeDB {
@@ -914,12 +767,13 @@ impl Drop for GreptimeDB {
}
}
struct GreptimeDBContext {
pub struct GreptimeDBContext {
/// Start time in millisecond
time: i64,
datanode_id: AtomicU32,
wal: WalConfig,
store_config: StoreConfig,
server_modes: Vec<ServerMode>,
}
impl GreptimeDBContext {
@@ -929,14 +783,19 @@ impl GreptimeDBContext {
datanode_id: AtomicU32::new(0),
wal,
store_config,
server_modes: Vec::new(),
}
}
fn is_raft_engine(&self) -> bool {
pub(crate) fn time(&self) -> i64 {
self.time
}
pub fn is_raft_engine(&self) -> bool {
matches!(self.wal, WalConfig::RaftEngine)
}
fn kafka_wal_broker_endpoints(&self) -> String {
pub fn kafka_wal_broker_endpoints(&self) -> String {
match &self.wal {
WalConfig::RaftEngine => String::new(),
WalConfig::Kafka {
@@ -949,17 +808,24 @@ impl GreptimeDBContext {
let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
}
fn datanode_id(&self) -> u32 {
self.datanode_id.load(Ordering::Relaxed)
}
fn reset_datanode_id(&self) {
self.datanode_id.store(0, Ordering::Relaxed);
}
fn store_config(&self) -> StoreConfig {
pub(crate) fn store_config(&self) -> StoreConfig {
self.store_config.clone()
}
fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
if idx >= self.server_modes.len() {
self.server_modes.resize(idx + 1, mode.clone());
}
self.server_modes[idx] = mode;
}
fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
self.server_modes.get(idx)
}
}
struct ResultDisplayer {

View File

@@ -26,6 +26,7 @@ use crate::env::StoreConfig;
mod env;
mod protocol_interceptor;
mod server_mode;
mod util;
#[derive(ValueEnum, Debug, Clone)]
@@ -110,11 +111,15 @@ struct Args {
/// Whether to setup pg, by default it is false.
#[clap(long, default_value = "false")]
setup_pg: bool,
/// The number of jobs to run in parallel. Default to half of the cores.
#[clap(short, long, default_value = "0")]
jobs: usize,
}
#[tokio::main]
async fn main() {
let args = Args::parse();
let mut args = Args::parse();
let temp_dir = tempfile::Builder::new()
.prefix("sqlness")
@@ -133,6 +138,20 @@ async fn main() {
panic!("{} is not a directory", d.display());
}
}
if args.jobs == 0 {
args.jobs = num_cpus::get() / 2;
}
// normalize parallelism to 1 if any of the following conditions are met:
if args.server_addr.server_addr.is_some()
|| args.setup_etcd
|| args.setup_pg
|| args.kafka_wal_broker_endpoints.is_some()
{
args.jobs = 1;
println!("Normalizing parallelism to 1 due to server addresses or etcd/pg setup");
}
let config = ConfigBuilder::default()
.case_dir(util::get_case_dir(args.case_dir))
.fail_fast(args.fail_fast)
@@ -140,6 +159,7 @@ async fn main() {
.follow_links(true)
.env_config_file(args.env_config_file)
.interceptor_registry(interceptor_registry)
.parallelism(args.jobs)
.build()
.unwrap();

View File

@@ -0,0 +1,464 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use serde::Serialize;
use tinytemplate::TinyTemplate;
use crate::env::{Env, GreptimeDBContext};
use crate::{util, ServerAddr};
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
#[derive(Clone)]
pub enum ServerMode {
Standalone {
http_addr: String,
rpc_bind_addr: String,
mysql_addr: String,
postgres_addr: String,
},
Frontend {
http_addr: String,
rpc_bind_addr: String,
mysql_addr: String,
postgres_addr: String,
metasrv_addr: String,
},
Metasrv {
rpc_bind_addr: String,
rpc_server_addr: String,
http_addr: String,
},
Datanode {
rpc_bind_addr: String,
rpc_server_addr: String,
http_addr: String,
metasrv_addr: String,
node_id: u32,
},
Flownode {
rpc_bind_addr: String,
rpc_server_addr: String,
http_addr: String,
metasrv_addr: String,
node_id: u32,
},
}
#[derive(Serialize)]
struct ConfigContext {
wal_dir: String,
data_home: String,
procedure_dir: String,
is_raft_engine: bool,
kafka_wal_broker_endpoints: String,
use_etcd: bool,
store_addrs: String,
instance_id: usize,
// for following addrs, leave it empty if not needed
// required for datanode
metasrv_addr: String,
// for frontend and standalone
grpc_addr: String,
// for standalone
mysql_addr: String,
// for standalone
postgres_addr: String,
}
impl ServerMode {
pub fn random_standalone() -> Self {
let http_port = util::get_random_port();
let rpc_port = util::get_random_port();
let mysql_port = util::get_random_port();
let postgres_port = util::get_random_port();
ServerMode::Standalone {
http_addr: format!("127.0.0.1:{http_port}"),
rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
mysql_addr: format!("127.0.0.1:{mysql_port}"),
postgres_addr: format!("127.0.0.1:{postgres_port}"),
}
}
pub fn random_frontend(metasrv_port: u16) -> Self {
let http_port = util::get_random_port();
let rpc_port = util::get_random_port();
let mysql_port = util::get_random_port();
let postgres_port = util::get_random_port();
ServerMode::Frontend {
http_addr: format!("127.0.0.1:{http_port}"),
rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
mysql_addr: format!("127.0.0.1:{mysql_port}"),
postgres_addr: format!("127.0.0.1:{postgres_port}"),
metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
}
}
pub fn random_metasrv() -> Self {
let bind_port = util::get_random_port();
let http_port = util::get_random_port();
ServerMode::Metasrv {
rpc_bind_addr: format!("127.0.0.1:{bind_port}"),
rpc_server_addr: format!("127.0.0.1:{bind_port}"),
http_addr: format!("127.0.0.1:{http_port}"),
}
}
pub fn random_datanode(metasrv_port: u16, node_id: u32) -> Self {
let rpc_port = util::get_random_port();
let http_port = util::get_random_port();
ServerMode::Datanode {
rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
http_addr: format!("127.0.0.1:{http_port}"),
metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
node_id,
}
}
pub fn random_flownode(metasrv_port: u16, node_id: u32) -> Self {
let rpc_port = util::get_random_port();
let http_port = util::get_random_port();
ServerMode::Flownode {
rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
rpc_server_addr: format!("127.0.0.1:{rpc_port}"),
http_addr: format!("127.0.0.1:{http_port}"),
metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
node_id,
}
}
pub fn name(&self) -> &'static str {
match self {
ServerMode::Standalone { .. } => "standalone",
ServerMode::Frontend { .. } => "frontend",
ServerMode::Metasrv { .. } => "metasrv",
ServerMode::Datanode { .. } => "datanode",
ServerMode::Flownode { .. } => "flownode",
}
}
/// Returns the addresses of the server that needed to be checked.
pub fn check_addrs(&self) -> Vec<String> {
match self {
ServerMode::Standalone {
rpc_bind_addr,
mysql_addr,
postgres_addr,
http_addr,
..
} => {
vec![
rpc_bind_addr.clone(),
mysql_addr.clone(),
postgres_addr.clone(),
http_addr.clone(),
]
}
ServerMode::Frontend {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => {
vec![
rpc_bind_addr.clone(),
mysql_addr.clone(),
postgres_addr.clone(),
]
}
ServerMode::Metasrv { rpc_bind_addr, .. } => {
vec![rpc_bind_addr.clone()]
}
ServerMode::Datanode { rpc_bind_addr, .. } => {
vec![rpc_bind_addr.clone()]
}
ServerMode::Flownode { rpc_bind_addr, .. } => {
vec![rpc_bind_addr.clone()]
}
}
}
/// Returns the server addresses to connect. Only standalone and frontend mode have this.
pub fn server_addr(&self) -> Option<ServerAddr> {
match self {
ServerMode::Standalone {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => Some(ServerAddr {
server_addr: Some(rpc_bind_addr.clone()),
pg_server_addr: Some(postgres_addr.clone()),
mysql_server_addr: Some(mysql_addr.clone()),
}),
ServerMode::Frontend {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => Some(ServerAddr {
server_addr: Some(rpc_bind_addr.clone()),
pg_server_addr: Some(postgres_addr.clone()),
mysql_server_addr: Some(mysql_addr.clone()),
}),
_ => None,
}
}
pub fn generate_config_file(
&self,
sqlness_home: &Path,
db_ctx: &GreptimeDBContext,
id: usize,
) -> String {
let mut tt = TinyTemplate::new();
let mut path = util::sqlness_conf_path();
path.push(format!("{}-test.toml.template", self.name()));
let template = std::fs::read_to_string(path).unwrap();
tt.add_template(self.name(), &template).unwrap();
let data_home = sqlness_home.join(format!("greptimedb-{}-{}", id, self.name()));
std::fs::create_dir_all(data_home.as_path()).unwrap();
let wal_dir = data_home.join("wal").display().to_string();
let procedure_dir = data_home.join("procedure").display().to_string();
// Get the required addresses based on server mode
let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
ServerMode::Standalone {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => (
String::new(),
rpc_bind_addr.clone(),
mysql_addr.clone(),
postgres_addr.clone(),
),
ServerMode::Frontend {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => (
String::new(),
rpc_bind_addr.clone(),
mysql_addr.clone(),
postgres_addr.clone(),
),
ServerMode::Datanode {
rpc_bind_addr,
metasrv_addr,
..
} => (
metasrv_addr.clone(),
rpc_bind_addr.clone(),
String::new(),
String::new(),
),
_ => (String::new(), String::new(), String::new(), String::new()),
};
let ctx = ConfigContext {
wal_dir,
data_home: data_home.display().to_string(),
procedure_dir,
is_raft_engine: db_ctx.is_raft_engine(),
kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(),
use_etcd: !db_ctx.store_config().store_addrs.is_empty(),
store_addrs: db_ctx
.store_config()
.store_addrs
.iter()
.map(|p| format!("\"{p}\""))
.collect::<Vec<_>>()
.join(","),
instance_id: id,
metasrv_addr,
grpc_addr,
mysql_addr,
postgres_addr,
};
let rendered = tt.render(self.name(), &ctx).unwrap();
let conf_file = data_home
.join(format!("{}-{}-{}.toml", self.name(), id, db_ctx.time()))
.display()
.to_string();
println!(
"Generating id {}, {} config file in {conf_file}, full content:\n{rendered}",
id,
self.name()
);
std::fs::write(&conf_file, rendered).unwrap();
conf_file
}
pub fn get_args(
&self,
sqlness_home: &Path,
_env: &Env,
db_ctx: &GreptimeDBContext,
id: usize,
) -> Vec<String> {
let mut args = vec![
DEFAULT_LOG_LEVEL.to_string(),
self.name().to_string(),
"start".to_string(),
];
match self {
ServerMode::Standalone {
http_addr,
rpc_bind_addr,
mysql_addr,
postgres_addr,
} => {
args.extend([
format!(
"--log-dir={}/greptimedb-{}-standalone/logs",
sqlness_home.display(),
id
),
"-c".to_string(),
self.generate_config_file(sqlness_home, db_ctx, id),
format!("--http-addr={http_addr}"),
format!("--rpc-addr={rpc_bind_addr}"),
format!("--mysql-addr={mysql_addr}"),
format!("--postgres-addr={postgres_addr}"),
]);
}
ServerMode::Frontend {
http_addr,
rpc_bind_addr,
mysql_addr,
postgres_addr,
metasrv_addr,
} => {
args.extend([
format!("--metasrv-addrs={metasrv_addr}"),
format!("--http-addr={http_addr}"),
format!("--rpc-addr={rpc_bind_addr}"),
format!("--mysql-addr={mysql_addr}"),
format!("--postgres-addr={postgres_addr}"),
format!(
"--log-dir={}/greptimedb-{}-frontend/logs",
sqlness_home.display(),
id
),
"-c".to_string(),
self.generate_config_file(sqlness_home, db_ctx, id),
]);
}
ServerMode::Metasrv {
rpc_bind_addr,
rpc_server_addr,
http_addr,
} => {
args.extend([
"--bind-addr".to_string(),
rpc_bind_addr.clone(),
"--server-addr".to_string(),
rpc_server_addr.clone(),
"--enable-region-failover".to_string(),
"false".to_string(),
format!("--http-addr={http_addr}"),
format!(
"--log-dir={}/greptimedb-{}-metasrv/logs",
sqlness_home.display(),
id
),
"-c".to_string(),
self.generate_config_file(sqlness_home, db_ctx, id),
]);
if db_ctx.store_config().setup_pg {
let client_ports = db_ctx
.store_config()
.store_addrs
.iter()
.map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
.collect::<Vec<_>>();
let client_port = client_ports.first().unwrap_or(&5432);
let pg_server_addr = format!(
"postgresql://greptimedb:admin@127.0.0.1:{}/postgres",
client_port
);
args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
} else if db_ctx.store_config().store_addrs.is_empty() {
args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
}
}
ServerMode::Datanode {
rpc_bind_addr,
rpc_server_addr,
http_addr,
metasrv_addr,
node_id,
} => {
let data_home = sqlness_home.join(format!(
"greptimedb_{}_datanode_{}_{node_id}",
id,
db_ctx.time()
));
args.extend([
format!("--rpc-addr={rpc_bind_addr}"),
format!("--rpc-server-addr={rpc_server_addr}"),
format!("--http-addr={http_addr}"),
format!("--data-home={}", data_home.display()),
format!("--log-dir={}/logs", data_home.display()),
format!("--node-id={node_id}"),
"-c".to_string(),
self.generate_config_file(sqlness_home, db_ctx, id),
format!("--metasrv-addrs={metasrv_addr}"),
]);
}
ServerMode::Flownode {
rpc_bind_addr,
rpc_server_addr,
http_addr,
metasrv_addr,
node_id,
} => {
args.extend([
format!("--rpc-addr={rpc_bind_addr}"),
format!("--rpc-server-addr={rpc_server_addr}"),
format!("--node-id={node_id}"),
format!(
"--log-dir={}/greptimedb-{}-flownode/logs",
sqlness_home.display(),
id
),
format!("--metasrv-addrs={metasrv_addr}"),
format!("--http-addr={http_addr}"),
]);
}
}
args
}
}

View File

@@ -465,3 +465,13 @@ pub fn teardown_wal() {
println!("kafka cluster is down");
}
/// Get a random available port by binding to port 0
pub fn get_random_port() -> u16 {
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind to random port");
listener
.local_addr()
.expect("Failed to get local address")
.port()
}