From b00b49284e54fe499893094795ffb8389d395ab5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 11 Jan 2024 17:47:19 +0800 Subject: [PATCH] feat: manager kafka cluster in sqlness runner (#3143) * feat: manager kafka cluster in sqlness runner Signed-off-by: Ruihang Xia * pull up clippy config Signed-off-by: Ruihang Xia * Apply suggestions from code review Co-authored-by: niebayes --------- Signed-off-by: Ruihang Xia Co-authored-by: niebayes --- tests/README.md | 14 +++++++++ tests/conf/kafka-cluster.yml | 22 +++++++++++++++ tests/runner/src/env.rs | 33 +++++++++++++++------- tests/runner/src/main.rs | 16 +++++++---- tests/runner/src/util.rs | 55 ++++++++++++++++++++++++++++++++++++ 5 files changed, 124 insertions(+), 16 deletions(-) create mode 100644 tests/conf/kafka-cluster.yml diff --git a/tests/README.md b/tests/README.md index 58d6032523..ae2d42d9c5 100644 --- a/tests/README.md +++ b/tests/README.md @@ -23,6 +23,20 @@ modes. E.g., `standalone/` contains all the tests to run under `greptimedb stand Under the first level of subdirectory (e.g. the `cases/standalone`), you can organize your cases as you like. Sqlness walks through every file recursively and runs them. +### Kafka WAL + +Sqlness supports Kafka WAL. You can either provide a Kafka cluster or let sqlness to start one for you. + +To run test with kafka, you need to pass the option `-w kafka`. If no other options are provided, sqlness will use `conf/kafka-cluster.yml` to start a Kafka cluster. This requires `docker` and `docker-compose` commands in your environment. + +Otherwise, you can additionally pass the your existing kafka environment to sqlness with `-k` option. E.g.: + +```shell +cargo sqlness -w kafka -k localhost:9092 +``` + +In this case, sqlness will not start its own kafka cluster and the one you provided instead. + ## Run the test Unlike other tests, this harness is in a binary target form. You can run it with: diff --git a/tests/conf/kafka-cluster.yml b/tests/conf/kafka-cluster.yml new file mode 100644 index 0000000000..b928f9e1ec --- /dev/null +++ b/tests/conf/kafka-cluster.yml @@ -0,0 +1,22 @@ +# docker-compose file for starting a kafka cluster when kafka wal is enabled +version: '3.8' +services: + kafka: + image: bitnami/kafka:3.6.0 + container_name: kafka + ports: + - 9092:9092 + environment: + # KRaft settings + KAFKA_KRAFT_CLUSTER_ID: Kmp-xkTnSf-WWXhWmiorDg + KAFKA_ENABLE_KRAFT: "yes" + KAFKA_CFG_NODE_ID: "1" + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181 + # Listeners + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181 + ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_BROKER_ID: "1" diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 76946ad6ba..3df634d5f9 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -44,7 +44,12 @@ const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafus #[derive(Clone)] pub enum WalConfig { RaftEngine, - Kafka { broker_endpoints: Vec }, + Kafka { + /// Indicates whether the runner needs to start a kafka cluster + /// (it might be available in the external system environment). + needs_kafka_cluster: bool, + broker_endpoints: Vec, + }, } #[derive(Clone)] @@ -54,7 +59,6 @@ pub struct Env { wal: WalConfig, } -#[allow(clippy::print_stdout)] #[async_trait] impl EnvController for Env { type DB = GreptimeDB; @@ -73,7 +77,6 @@ impl EnvController for Env { } } -#[allow(clippy::print_stdout)] impl Env { pub fn new(data_home: PathBuf, server_addr: Option, wal: WalConfig) -> Self { Self { @@ -88,6 +91,7 @@ impl Env { self.connect_db(&server_addr) } else { Self::build_db().await; + self.setup_wal(); let db_ctx = GreptimeDBContext::new(self.wal.clone()); @@ -113,6 +117,7 @@ impl Env { self.connect_db(&server_addr) } else { Self::build_db().await; + self.setup_wal(); let db_ctx = GreptimeDBContext::new(self.wal.clone()); @@ -317,13 +322,18 @@ impl Env { } } + /// Setup kafka wal cluster if needed. The conterpart is in [GreptimeDB::stop]. + fn setup_wal(&self) { + if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) { + util::setup_wal(); + } + } + /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String { let mut tt = TinyTemplate::new(); - let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - path.pop(); - path.push("conf"); + let mut path = util::sqlness_conf_path(); path.push(format!("{subcommand}-test.toml.template")); let template = std::fs::read_to_string(path).unwrap(); tt.add_template(subcommand, &template).unwrap(); @@ -433,7 +443,6 @@ impl Database for GreptimeDB { } impl GreptimeDB { - #![allow(clippy::print_stdout)] fn stop(&mut self) { if let Some(server_processes) = self.server_processes.clone() { let mut server_processes = server_processes.lock().unwrap(); @@ -447,6 +456,10 @@ impl GreptimeDB { if let Some(mut datanode) = self.frontend_process.take() { Env::stop_server(&mut datanode); } + if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) + { + util::teardown_wal(); + } println!("Stopped DB."); } } @@ -482,9 +495,9 @@ impl GreptimeDBContext { fn kafka_wal_broker_endpoints(&self) -> String { match &self.wal { WalConfig::RaftEngine => String::new(), - WalConfig::Kafka { broker_endpoints } => { - serde_json::to_string(&broker_endpoints).unwrap() - } + WalConfig::Kafka { + broker_endpoints, .. + } => serde_json::to_string(&broker_endpoints).unwrap(), } } diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index 5fdddbd714..c1cd011056 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(clippy::print_stdout)] + use std::path::PathBuf; use clap::{Parser, ValueEnum}; @@ -56,9 +58,10 @@ struct Args { #[clap(short, long, default_value = "raft_engine")] wal: Wal, - /// The kafka wal broker endpoints. - #[clap(short, long, default_value = "127.0.0.1:9092")] - kafka_wal_broker_endpoints: String, + /// The kafka wal broker endpoints. This config will suppress sqlness runner + /// from starting a kafka cluster, and use the given endpoint as kafka backend. + #[clap(short, long)] + kafka_wal_broker_endpoints: Option, } #[tokio::main] @@ -82,11 +85,12 @@ async fn main() { let wal = match args.wal { Wal::RaftEngine => WalConfig::RaftEngine, Wal::Kafka => WalConfig::Kafka { + needs_kafka_cluster: args.kafka_wal_broker_endpoints.is_none(), broker_endpoints: args .kafka_wal_broker_endpoints - .split(',') - .map(|s| s.trim().to_string()) - .collect(), + .map(|s| s.split(',').map(|s| s.to_string()).collect()) + // otherwise default to the same port in `kafka-cluster.yml` + .unwrap_or(vec!["127.0.0.1:9092".to_string()]), }, }; diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index c51b81db25..71ae585a81 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -15,6 +15,7 @@ use std::fmt::Display; use std::net::SocketAddr; use std::path::PathBuf; +use std::process::Command; use std::time::Duration; use tokio::io::AsyncWriteExt; @@ -119,3 +120,57 @@ pub async fn check_port(ip_addr: SocketAddr, timeout: Duration) -> bool { tokio::time::timeout(timeout, check_task).await.is_ok() } + +/// Get the path of sqlness config dir `tests/conf`. +pub fn sqlness_conf_path() -> PathBuf { + let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + sqlness_root_path.pop(); + sqlness_root_path.push("conf"); + sqlness_root_path +} + +/// Start kafka cluster if needed. Config file is `conf/kafka-cluster.yml`. +/// +/// ```shell +/// docker compose -f kafka-cluster.yml up kafka -d --wait +/// ``` +pub fn setup_wal() { + let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + sqlness_root_path.pop(); + sqlness_root_path.push("conf"); + + Command::new("docker") + .current_dir(sqlness_conf_path()) + .args([ + "compose", + "-f", + "kafka-cluster.yml", + "up", + "kafka", + "-d", + "--wait", + ]) + .output() + .expect("Failed to start kafka cluster"); + + println!("kafka cluster is up"); +} + +/// Stop kafka cluster if needed. Config file is `conf/kafka-cluster.yml`. +/// +/// ```shell +/// docker compose -f docker-compose-standalone.yml down kafka +/// ``` +pub fn teardown_wal() { + let mut sqlness_root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + sqlness_root_path.pop(); + sqlness_root_path.push("conf"); + + Command::new("docker") + .current_dir(sqlness_conf_path()) + .args(["compose", "-f", "kafka-cluster.yml", "down", "kafka"]) + .output() + .expect("Failed to stop kafka cluster"); + + println!("kafka cluster is down"); +}