feat: manager kafka cluster in sqlness runner (#3143)

* feat: manager kafka cluster in sqlness runner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* pull up clippy config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: niebayes <niebayes@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: niebayes <niebayes@gmail.com>
This commit is contained in:
Ruihang Xia
2024-01-11 17:47:19 +08:00
committed by GitHub
parent 09b3c7029b
commit b00b49284e
5 changed files with 124 additions and 16 deletions

View File

@@ -44,7 +44,12 @@ const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafus
#[derive(Clone)]
pub enum WalConfig {
RaftEngine,
Kafka { broker_endpoints: Vec<String> },
Kafka {
/// Indicates whether the runner needs to start a kafka cluster
/// (it might be available in the external system environment).
needs_kafka_cluster: bool,
broker_endpoints: Vec<String>,
},
}
#[derive(Clone)]
@@ -54,7 +59,6 @@ pub struct Env {
wal: WalConfig,
}
#[allow(clippy::print_stdout)]
#[async_trait]
impl EnvController for Env {
type DB = GreptimeDB;
@@ -73,7 +77,6 @@ impl EnvController for Env {
}
}
#[allow(clippy::print_stdout)]
impl Env {
pub fn new(data_home: PathBuf, server_addr: Option<String>, wal: WalConfig) -> Self {
Self {
@@ -88,6 +91,7 @@ impl Env {
self.connect_db(&server_addr)
} else {
Self::build_db().await;
self.setup_wal();
let db_ctx = GreptimeDBContext::new(self.wal.clone());
@@ -113,6 +117,7 @@ impl Env {
self.connect_db(&server_addr)
} else {
Self::build_db().await;
self.setup_wal();
let db_ctx = GreptimeDBContext::new(self.wal.clone());
@@ -317,13 +322,18 @@ impl Env {
}
}
/// Setup kafka wal cluster if needed. The conterpart is in [GreptimeDB::stop].
fn setup_wal(&self) {
if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
util::setup_wal();
}
}
/// Generate config file to `/tmp/{subcommand}-{current_time}.toml`
fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String {
let mut tt = TinyTemplate::new();
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.pop();
path.push("conf");
let mut path = util::sqlness_conf_path();
path.push(format!("{subcommand}-test.toml.template"));
let template = std::fs::read_to_string(path).unwrap();
tt.add_template(subcommand, &template).unwrap();
@@ -433,7 +443,6 @@ impl Database for GreptimeDB {
}
impl GreptimeDB {
#![allow(clippy::print_stdout)]
fn stop(&mut self) {
if let Some(server_processes) = self.server_processes.clone() {
let mut server_processes = server_processes.lock().unwrap();
@@ -447,6 +456,10 @@ impl GreptimeDB {
if let Some(mut datanode) = self.frontend_process.take() {
Env::stop_server(&mut datanode);
}
if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
{
util::teardown_wal();
}
println!("Stopped DB.");
}
}
@@ -482,9 +495,9 @@ impl GreptimeDBContext {
fn kafka_wal_broker_endpoints(&self) -> String {
match &self.wal {
WalConfig::RaftEngine => String::new(),
WalConfig::Kafka { broker_endpoints } => {
serde_json::to_string(&broker_endpoints).unwrap()
}
WalConfig::Kafka {
broker_endpoints, ..
} => serde_json::to_string(&broker_endpoints).unwrap(),
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(clippy::print_stdout)]
use std::path::PathBuf;
use clap::{Parser, ValueEnum};
@@ -56,9 +58,10 @@ struct Args {
#[clap(short, long, default_value = "raft_engine")]
wal: Wal,
/// The kafka wal broker endpoints.
#[clap(short, long, default_value = "127.0.0.1:9092")]
kafka_wal_broker_endpoints: String,
/// The kafka wal broker endpoints. This config will suppress sqlness runner
/// from starting a kafka cluster, and use the given endpoint as kafka backend.
#[clap(short, long)]
kafka_wal_broker_endpoints: Option<String>,
}
#[tokio::main]
@@ -82,11 +85,12 @@ async fn main() {
let wal = match args.wal {
Wal::RaftEngine => WalConfig::RaftEngine,
Wal::Kafka => WalConfig::Kafka {
needs_kafka_cluster: args.kafka_wal_broker_endpoints.is_none(),
broker_endpoints: args
.kafka_wal_broker_endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect(),
.map(|s| s.split(',').map(|s| s.to_string()).collect())
// otherwise default to the same port in `kafka-cluster.yml`
.unwrap_or(vec!["127.0.0.1:9092".to_string()]),
},
};

View File

@@ -15,6 +15,7 @@
use std::fmt::Display;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
@@ -119,3 +120,57 @@ pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool {
tokio::time::timeout(timeout, check_task).await.is_ok()
}
/// Get the path of sqlness config dir `tests/conf`.
pub fn sqlness_conf_path() -> PathBuf {
let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
sqlness_root_path.pop();
sqlness_root_path.push("conf");
sqlness_root_path
}
/// Start kafka cluster if needed. Config file is `conf/kafka-cluster.yml`.
///
/// ```shell
/// docker compose -f kafka-cluster.yml up kafka -d --wait
/// ```
pub fn setup_wal() {
let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
sqlness_root_path.pop();
sqlness_root_path.push("conf");
Command::new("docker")
.current_dir(sqlness_conf_path())
.args([
"compose",
"-f",
"kafka-cluster.yml",
"up",
"kafka",
"-d",
"--wait",
])
.output()
.expect("Failed to start kafka cluster");
println!("kafka cluster is up");
}
/// Stop kafka cluster if needed. Config file is `conf/kafka-cluster.yml`.
///
/// ```shell
/// docker compose -f docker-compose-standalone.yml down kafka
/// ```
pub fn teardown_wal() {
let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
sqlness_root_path.pop();
sqlness_root_path.push("conf");
Command::new("docker")
.current_dir(sqlness_conf_path())
.args(["compose", "-f", "kafka-cluster.yml", "down", "kafka"])
.output()
.expect("Failed to stop kafka cluster");
println!("kafka cluster is down");
}