sqlness_runner/cmd/
bare.rs1use std::path::PathBuf;
16use std::sync::Arc;
17
18use clap::{Parser, ValueEnum};
19use sqlness::interceptor::Registry;
20use sqlness::{ConfigBuilder, Runner};
21
22use crate::cmd::SqlnessConfig;
23use crate::env::bare::{Env, ServiceProvider, StoreConfig, WalConfig};
24use crate::{protocol_interceptor, util};
25
26#[derive(ValueEnum, Debug, Clone)]
27#[clap(rename_all = "snake_case")]
28enum Wal {
29 RaftEngine,
30 Kafka,
31}
32
33#[derive(clap::Args, Debug, Clone, Default)]
35pub(crate) struct ServerAddr {
36 #[clap(short, long)]
38 pub(crate) server_addr: Option<String>,
39
40 #[clap(short, long, requires = "server_addr")]
42 pub(crate) pg_server_addr: Option<String>,
43
44 #[clap(short, long, requires = "server_addr")]
46 pub(crate) mysql_server_addr: Option<String>,
47}
48
49#[derive(Debug, Parser)]
50pub struct BareCommand {
52 #[clap(flatten)]
53 config: SqlnessConfig,
54
55 #[command(flatten)]
57 server_addr: ServerAddr,
58
59 #[clap(short, long, default_value = "raft_engine")]
61 wal: Wal,
62
63 #[clap(short, long)]
66 kafka_wal_broker_endpoints: Option<String>,
67
68 #[clap(long)]
71 bins_dir: Option<PathBuf>,
72
73 #[clap(long)]
76 preserve_state: bool,
77
78 #[clap(long, default_value = "true")]
80 pull_version_on_need: bool,
81
82 #[clap(long)]
84 store_addrs: Vec<String>,
85
86 #[clap(long, default_value = "false")]
88 setup_etcd: bool,
89
90 #[clap(long, default_missing_value = "", num_args(0..=1))]
92 setup_pg: Option<ServiceProvider>,
93
94 #[clap(long, default_missing_value = "", num_args(0..=1))]
96 setup_mysql: Option<ServiceProvider>,
97
98 #[clap(short, long, default_value = "0")]
100 jobs: usize,
101
102 #[clap(long)]
104 extra_args: Vec<String>,
105
106 #[clap(long, default_value = "false")]
108 enable_flat_format: bool,
109}
110
111impl BareCommand {
112 pub async fn run(mut self) {
113 let temp_dir = tempfile::Builder::new()
114 .prefix("sqlness")
115 .tempdir()
116 .unwrap();
117 let sqlness_home = temp_dir.keep();
118
119 let mut interceptor_registry: Registry = Default::default();
120 interceptor_registry.register(
121 protocol_interceptor::PREFIX,
122 Arc::new(protocol_interceptor::ProtocolInterceptorFactory),
123 );
124
125 if let Some(d) = &self.config.case_dir
126 && !d.is_dir()
127 {
128 panic!("{} is not a directory", d.display());
129 }
130 if self.jobs == 0 {
131 self.jobs = num_cpus::get() / 2;
132 }
133
134 if self.server_addr.server_addr.is_some()
137 || self.setup_etcd
138 || self.setup_pg.is_some()
139 || self.setup_mysql.is_some()
140 || matches!(self.wal, Wal::Kafka)
141 || self.kafka_wal_broker_endpoints.is_some()
142 || self.config.test_filter != ".*"
143 {
144 self.jobs = 1;
145 println!(
146 "Normalizing parallelism to 1 due to server addresses, etcd/pg/mysql/kafka setup, or test filter usage"
147 );
148 }
149
150 let config = ConfigBuilder::default()
151 .case_dir(util::get_case_dir(self.config.case_dir))
152 .fail_fast(self.config.fail_fast)
153 .test_filter(self.config.test_filter)
154 .follow_links(true)
155 .env_config_file(self.config.env_config_file)
156 .interceptor_registry(interceptor_registry)
157 .parallelism(self.jobs)
158 .env_parallelism(self.jobs)
159 .build()
160 .unwrap();
161
162 let wal = match self.wal {
163 Wal::RaftEngine => WalConfig::RaftEngine,
164 Wal::Kafka => WalConfig::Kafka {
165 needs_kafka_cluster: self.kafka_wal_broker_endpoints.is_none(),
166 broker_endpoints: self
167 .kafka_wal_broker_endpoints
168 .map(|s| s.split(',').map(|s| s.to_string()).collect())
169 .unwrap_or(vec!["127.0.0.1:9092".to_string()]),
171 },
172 };
173
174 let store = StoreConfig {
175 store_addrs: self.store_addrs.clone(),
176 setup_etcd: self.setup_etcd,
177 setup_pg: self.setup_pg,
178 setup_mysql: self.setup_mysql,
179 enable_flat_format: self.enable_flat_format,
180 };
181
182 let runner = Runner::new(
183 config,
184 Env::new(
185 sqlness_home.clone(),
186 self.server_addr,
187 wal,
188 self.pull_version_on_need,
189 self.bins_dir,
190 store,
191 self.extra_args,
192 ),
193 );
194 match runner.run().await {
195 Ok(_) => println!("\x1b[32mAll sqlness tests passed!\x1b[0m"),
196 Err(e) => {
197 println!("\x1b[31mTest failed: {}\x1b[0m", e);
198 std::process::exit(1);
199 }
200 }
201
202 if !self.preserve_state {
204 if self.setup_etcd {
205 println!("Stopping etcd");
206 util::stop_rm_etcd();
207 }
208 println!("Removing state in {:?}", sqlness_home);
210 tokio::fs::remove_dir_all(sqlness_home).await.unwrap();
211 }
212 }
213}