1use std::collections::HashMap;
16use std::fmt::Display;
17use std::fs::OpenOptions;
18use std::io;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::process::{Child, Command};
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use async_trait::async_trait;
27use sqlness::{Database, EnvController, QueryContext};
28use tokio::sync::Mutex as TokioMutex;
29
30use crate::client::MultiProtocolClient;
31use crate::cmd::bare::ServerAddr;
32use crate::formatter::{ErrorFormatter, MysqlFormatter, OutputFormatter, PostgresqlFormatter};
33use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
34use crate::server_mode::ServerMode;
35use crate::util;
36use crate::util::{PROGRAM, get_workspace_root, maybe_pull_binary};
37
38const SERVER_MODE_STANDALONE_IDX: usize = 0;
40const SERVER_MODE_METASRV_IDX: usize = 0;
42const SERVER_MODE_DATANODE_START_IDX: usize = 1;
43const SERVER_MODE_FRONTEND_IDX: usize = 4;
44const SERVER_MODE_FLOWNODE_IDX: usize = 5;
45
46#[derive(Clone)]
47pub enum WalConfig {
48 RaftEngine,
49 Kafka {
50 needs_kafka_cluster: bool,
53 broker_endpoints: Vec<String>,
54 },
55}
56
57#[derive(Debug, Clone)]
58pub(crate) enum ServiceProvider {
59 Create,
60 External(String),
61}
62
63impl From<&str> for ServiceProvider {
64 fn from(value: &str) -> Self {
65 if value.is_empty() {
66 Self::Create
67 } else {
68 Self::External(value.to_string())
69 }
70 }
71}
72
73#[derive(Clone)]
74pub struct StoreConfig {
75 pub store_addrs: Vec<String>,
76 pub setup_etcd: bool,
77 pub(crate) setup_pg: Option<ServiceProvider>,
78 pub(crate) setup_mysql: Option<ServiceProvider>,
79 pub enable_flat_format: bool,
80}
81
82#[derive(Clone)]
83pub struct Env {
84 sqlness_home: PathBuf,
85 server_addrs: ServerAddr,
86 wal: WalConfig,
87
88 bins_dir: Arc<Mutex<Option<PathBuf>>>,
92 versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
94 pull_version_on_need: bool,
96 store_config: StoreConfig,
98 extra_args: Vec<String>,
100}
101
102#[async_trait]
103impl EnvController for Env {
104 type DB = GreptimeDB;
105
106 async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
107 if self.server_addrs.server_addr.is_some() && id > 0 {
108 panic!("Parallel test mode is not supported when server address is already set.");
109 }
110
111 unsafe {
112 std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
113 }
114 match mode {
115 "standalone" => self.start_standalone(id).await,
116 "distributed" => self.start_distributed(id).await,
117 _ => panic!("Unexpected mode: {mode}"),
118 }
119 }
120
121 async fn stop(&self, _mode: &str, mut database: Self::DB) {
123 database.stop();
124 }
125}
126
127impl Env {
128 pub fn new(
129 data_home: PathBuf,
130 server_addrs: ServerAddr,
131 wal: WalConfig,
132 pull_version_on_need: bool,
133 bins_dir: Option<PathBuf>,
134 store_config: StoreConfig,
135 extra_args: Vec<String>,
136 ) -> Self {
137 Self {
138 sqlness_home: data_home,
139 server_addrs,
140 wal,
141 pull_version_on_need,
142 bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
143 versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
144 "latest".to_string(),
145 bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
146 )]))),
147 store_config,
148 extra_args,
149 }
150 }
151
152 async fn start_standalone(&self, id: usize) -> GreptimeDB {
153 println!("Starting standalone instance id: {id}");
154
155 if self.server_addrs.server_addr.is_some() {
156 self.connect_db(&self.server_addrs, id).await
157 } else {
158 self.build_db();
159 self.setup_wal();
160 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
161
162 let server_mode = ServerMode::random_standalone();
163 db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
164 let server_addr = server_mode.server_addr().unwrap();
165 let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
166
167 let mut greptimedb = self.connect_db(&server_addr, id).await;
168 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
169 greptimedb.is_standalone = true;
170 greptimedb.ctx = db_ctx;
171
172 greptimedb
173 }
174 }
175
176 async fn start_distributed(&self, id: usize) -> GreptimeDB {
177 if self.server_addrs.server_addr.is_some() {
178 self.connect_db(&self.server_addrs, id).await
179 } else {
180 self.build_db();
181 self.setup_wal();
182 self.setup_etcd();
183 self.setup_pg();
184 self.setup_mysql().await;
185 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
186
187 let meta_server_mode = ServerMode::random_metasrv();
189 let metasrv_port = match &meta_server_mode {
190 ServerMode::Metasrv {
191 rpc_server_addr, ..
192 } => rpc_server_addr
193 .split(':')
194 .nth(1)
195 .unwrap()
196 .parse::<u16>()
197 .unwrap(),
198 _ => panic!(
199 "metasrv mode not set, maybe running in remote mode which doesn't support restart?"
200 ),
201 };
202 db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
203 let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
204
205 let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
206 db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
207 let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
208 let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
209 db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
210 let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
211 let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
212 db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
213 let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
214
215 let frontend_mode = ServerMode::random_frontend(metasrv_port);
216 let server_addr = frontend_mode.server_addr().unwrap();
217 db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
218 let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
219
220 let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
221 db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
222 let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
223
224 let mut greptimedb = self.connect_db(&server_addr, id).await;
225
226 greptimedb.metasrv_process = Some(meta_server).into();
227 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
228 datanode_1, datanode_2, datanode_3,
229 ])));
230 greptimedb.frontend_process = Some(frontend).into();
231 greptimedb.flownode_process = Some(flownode).into();
232 greptimedb.is_standalone = false;
233 greptimedb.ctx = db_ctx;
234
235 greptimedb
236 }
237 }
238
239 async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
240 let grpc_server_addr = server_addr.server_addr.as_ref().unwrap();
241 let pg_server_addr = server_addr.pg_server_addr.as_ref().unwrap();
242 let mysql_server_addr = server_addr.mysql_server_addr.as_ref().unwrap();
243
244 let client =
245 MultiProtocolClient::connect(grpc_server_addr, pg_server_addr, mysql_server_addr).await;
246 GreptimeDB {
247 client: TokioMutex::new(client),
248 server_processes: None,
249 metasrv_process: None.into(),
250 frontend_process: None.into(),
251 flownode_process: None.into(),
252 ctx: GreptimeDBContext {
253 time: 0,
254 datanode_id: Default::default(),
255 wal: self.wal.clone(),
256 store_config: self.store_config.clone(),
257 server_modes: Vec::new(),
258 },
259 is_standalone: false,
260 env: self.clone(),
261 id,
262 }
263 }
264
265 fn stop_server(process: &mut Child) {
266 let _ = process.kill();
267 let _ = process.wait();
268 }
269
270 async fn start_server(
271 &self,
272 mode: ServerMode,
273 db_ctx: &GreptimeDBContext,
274 id: usize,
275 truncate_log: bool,
276 ) -> Child {
277 let log_file_name = match mode {
278 ServerMode::Datanode { node_id, .. } => {
279 db_ctx.incr_datanode_id();
280 format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
281 }
282 ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
283 ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
284 ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
285 ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
286 };
287 let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
288
289 println!("DB instance {id} log file at {stdout_file_name}");
290
291 let stdout_file = OpenOptions::new()
292 .create(true)
293 .write(true)
294 .truncate(truncate_log)
295 .append(!truncate_log)
296 .open(&stdout_file_name)
297 .unwrap();
298
299 let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
300 let check_ip_addrs = mode.check_addrs();
301
302 for check_ip_addr in &check_ip_addrs {
303 if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
304 panic!(
305 "Port {check_ip_addr} is already in use, please check and retry.",
306 check_ip_addr = check_ip_addr
307 );
308 }
309 }
310
311 let program = PROGRAM;
312
313 let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
314 "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
315 );
316
317 let abs_bins_dir = bins_dir
318 .canonicalize()
319 .expect("Failed to canonicalize bins_dir");
320
321 let mut process = Command::new(abs_bins_dir.join(program))
322 .current_dir(bins_dir.clone())
323 .env("TZ", "UTC")
324 .args(args)
325 .stdout(stdout_file)
326 .spawn()
327 .unwrap_or_else(|error| {
328 panic!(
329 "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
330 mode.name(),
331 bins_dir.join(program)
332 );
333 });
334
335 for check_ip_addr in &check_ip_addrs {
336 if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(30)).await {
337 Env::stop_server(&mut process);
338 panic!(
339 "{} doesn't up in 30 seconds, check {} for more details.",
340 mode.name(),
341 stdout_file_name
342 )
343 }
344 }
345
346 process
347 }
348
349 async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
351 {
352 if let Some(server_process) = db.server_processes.clone() {
353 let mut server_processes = server_process.lock().unwrap();
354 for server_process in server_processes.iter_mut() {
355 Env::stop_server(server_process);
356 }
357 }
358
359 if is_full_restart {
360 if let Some(mut metasrv_process) =
361 db.metasrv_process.lock().expect("poisoned lock").take()
362 {
363 Env::stop_server(&mut metasrv_process);
364 }
365 if let Some(mut frontend_process) =
366 db.frontend_process.lock().expect("poisoned lock").take()
367 {
368 Env::stop_server(&mut frontend_process);
369 }
370 }
371
372 if let Some(mut flownode_process) =
373 db.flownode_process.lock().expect("poisoned lock").take()
374 {
375 Env::stop_server(&mut flownode_process);
376 }
377 }
378
379 let new_server_processes = if db.is_standalone {
381 let server_mode = db
382 .ctx
383 .get_server_mode(SERVER_MODE_STANDALONE_IDX)
384 .cloned()
385 .unwrap();
386 let server_addr = server_mode.server_addr().unwrap();
387 let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
388
389 let mut client = db.client.lock().await;
390 client
391 .reconnect_mysql_client(&server_addr.mysql_server_addr.unwrap())
392 .await;
393 client
394 .reconnect_pg_client(&server_addr.pg_server_addr.unwrap())
395 .await;
396 vec![new_server_process]
397 } else {
398 db.ctx.reset_datanode_id();
399 if is_full_restart {
400 let metasrv_mode = db
401 .ctx
402 .get_server_mode(SERVER_MODE_METASRV_IDX)
403 .cloned()
404 .unwrap();
405 let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
406 db.metasrv_process
407 .lock()
408 .expect("lock poisoned")
409 .replace(metasrv);
410
411 tokio::time::sleep(Duration::from_secs(5)).await;
414 }
415
416 let mut processes = vec![];
417 for i in 0..3 {
418 let datanode_mode = db
419 .ctx
420 .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
421 .cloned()
422 .unwrap();
423 let new_server_process = self
424 .start_server(datanode_mode, &db.ctx, db.id, false)
425 .await;
426 processes.push(new_server_process);
427 }
428
429 if is_full_restart {
430 let frontend_mode = db
431 .ctx
432 .get_server_mode(SERVER_MODE_FRONTEND_IDX)
433 .cloned()
434 .unwrap();
435 let frontend = self
436 .start_server(frontend_mode, &db.ctx, db.id, false)
437 .await;
438 db.frontend_process
439 .lock()
440 .expect("lock poisoned")
441 .replace(frontend);
442 }
443
444 let flownode_mode = db
445 .ctx
446 .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
447 .cloned()
448 .unwrap();
449 let flownode = self
450 .start_server(flownode_mode, &db.ctx, db.id, false)
451 .await;
452 db.flownode_process
453 .lock()
454 .expect("lock poisoned")
455 .replace(flownode);
456
457 processes
458 };
459
460 if let Some(server_processes) = db.server_processes.clone() {
461 let mut server_processes = server_processes.lock().unwrap();
462 *server_processes = new_server_processes;
463 }
464 }
465
466 fn setup_wal(&self) {
468 if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
469 util::setup_wal();
470 }
471 }
472
473 fn setup_etcd(&self) {
475 if self.store_config.setup_etcd {
476 let client_ports = self
477 .store_config
478 .store_addrs
479 .iter()
480 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
481 .collect::<Vec<_>>();
482 util::setup_etcd(client_ports, None, None);
483 }
484 }
485
486 fn setup_pg(&self) {
488 if matches!(self.store_config.setup_pg, Some(ServiceProvider::Create)) {
489 let client_ports = self
490 .store_config
491 .store_addrs
492 .iter()
493 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
494 .collect::<Vec<_>>();
495 let client_port = client_ports.first().unwrap_or(&5432);
496 util::setup_pg(*client_port, None);
497 }
498 }
499
500 async fn setup_mysql(&self) {
502 if matches!(self.store_config.setup_mysql, Some(ServiceProvider::Create)) {
503 let client_ports = self
504 .store_config
505 .store_addrs
506 .iter()
507 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
508 .collect::<Vec<_>>();
509 let client_port = client_ports.first().unwrap_or(&3306);
510 util::setup_mysql(*client_port, None);
511
512 tokio::time::sleep(Duration::from_secs(10)).await;
514 }
515 }
516
517 fn build_db(&self) {
519 if self.bins_dir.lock().unwrap().is_some() {
520 return;
521 }
522
523 println!("Going to build the DB...");
524 let output = Command::new("cargo")
525 .current_dir(util::get_workspace_root())
526 .args([
527 "build",
528 "--bin",
529 "greptime",
530 "--features",
531 "pg_kvbackend,mysql_kvbackend",
532 ])
533 .output()
534 .expect("Failed to start GreptimeDB");
535 if !output.status.success() {
536 println!("Failed to build GreptimeDB, {}", output.status);
537 println!("Cargo build stdout:");
538 io::stdout().write_all(&output.stdout).unwrap();
539 println!("Cargo build stderr:");
540 io::stderr().write_all(&output.stderr).unwrap();
541 panic!();
542 }
543
544 let _ = self
545 .bins_dir
546 .lock()
547 .unwrap()
548 .insert(util::get_binary_dir("debug"));
549 }
550
551 pub(crate) fn extra_args(&self) -> &Vec<String> {
552 &self.extra_args
553 }
554}
555
556pub struct GreptimeDB {
557 server_processes: Option<Arc<Mutex<Vec<Child>>>>,
558 metasrv_process: Mutex<Option<Child>>,
559 frontend_process: Mutex<Option<Child>>,
560 flownode_process: Mutex<Option<Child>>,
561 client: TokioMutex<MultiProtocolClient>,
562 ctx: GreptimeDBContext,
563 is_standalone: bool,
564 env: Env,
565 id: usize,
566}
567
568impl GreptimeDB {
569 async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
570 let mut client = self.client.lock().await;
571
572 match client.postgres_query(&query).await {
573 Ok(rows) => Box::new(PostgresqlFormatter::from(rows)),
574 Err(e) => Box::new(e),
575 }
576 }
577
578 async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
579 let mut client = self.client.lock().await;
580
581 match client.mysql_query(&query).await {
582 Ok(res) => Box::new(MysqlFormatter::from(res)),
583 Err(e) => Box::new(e),
584 }
585 }
586
587 async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
588 let mut client = self.client.lock().await;
589
590 match client.grpc_query(&query).await {
591 Ok(rows) => Box::new(OutputFormatter::from(rows)),
592 Err(e) => Box::new(ErrorFormatter::from(e)),
593 }
594 }
595}
596
597#[async_trait]
598impl Database for GreptimeDB {
599 async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
600 if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
601 self.env.restart_server(self, false).await;
602 } else if let Some(version) = ctx.context.get("version") {
603 let version_bin_dir = self
604 .env
605 .versioned_bins_dirs
606 .lock()
607 .expect("lock poison")
608 .get(version.as_str())
609 .cloned();
610
611 match version_bin_dir {
612 Some(path) if path.clone().join(PROGRAM).is_file() => {
613 *self.env.bins_dir.lock().unwrap() = Some(path.clone());
615 }
616 _ => {
617 maybe_pull_binary(version, self.env.pull_version_on_need).await;
619 let root = get_workspace_root();
620 let new_path = PathBuf::from_iter([&root, version]);
621 *self.env.bins_dir.lock().unwrap() = Some(new_path);
622 }
623 }
624
625 self.env.restart_server(self, true).await;
626 tokio::time::sleep(Duration::from_secs(5)).await;
628 }
629
630 if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
631 if protocol == MYSQL {
633 self.mysql_query(ctx, query).await
634 } else {
635 self.postgres_query(ctx, query).await
636 }
637 } else {
638 self.grpc_query(ctx, query).await
639 }
640 }
641}
642
643impl GreptimeDB {
644 fn stop(&mut self) {
645 if let Some(server_processes) = self.server_processes.clone() {
646 let mut server_processes = server_processes.lock().unwrap();
647 for mut server_process in server_processes.drain(..) {
648 Env::stop_server(&mut server_process);
649 println!(
650 "Standalone or Datanode (pid = {}) is stopped",
651 server_process.id()
652 );
653 }
654 }
655 if let Some(mut metasrv) = self
656 .metasrv_process
657 .lock()
658 .expect("someone else panic when holding lock")
659 .take()
660 {
661 Env::stop_server(&mut metasrv);
662 println!("Metasrv (pid = {}) is stopped", metasrv.id());
663 }
664 if let Some(mut frontend) = self
665 .frontend_process
666 .lock()
667 .expect("someone else panic when holding lock")
668 .take()
669 {
670 Env::stop_server(&mut frontend);
671 println!("Frontend (pid = {}) is stopped", frontend.id());
672 }
673 if let Some(mut flownode) = self
674 .flownode_process
675 .lock()
676 .expect("someone else panic when holding lock")
677 .take()
678 {
679 Env::stop_server(&mut flownode);
680 println!("Flownode (pid = {}) is stopped", flownode.id());
681 }
682 if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
683 {
684 util::teardown_wal();
685 }
686 }
687}
688
689impl Drop for GreptimeDB {
690 fn drop(&mut self) {
691 if self.env.server_addrs.server_addr.is_none() {
692 self.stop();
693 }
694 }
695}
696
697pub struct GreptimeDBContext {
698 time: i64,
700 datanode_id: AtomicU32,
701 wal: WalConfig,
702 store_config: StoreConfig,
703 server_modes: Vec<ServerMode>,
704}
705
706impl GreptimeDBContext {
707 pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
708 Self {
709 time: common_time::util::current_time_millis(),
710 datanode_id: AtomicU32::new(0),
711 wal,
712 store_config,
713 server_modes: Vec::new(),
714 }
715 }
716
717 pub(crate) fn time(&self) -> i64 {
718 self.time
719 }
720
721 pub fn is_raft_engine(&self) -> bool {
722 matches!(self.wal, WalConfig::RaftEngine)
723 }
724
725 pub fn kafka_wal_broker_endpoints(&self) -> String {
726 match &self.wal {
727 WalConfig::RaftEngine => String::new(),
728 WalConfig::Kafka {
729 broker_endpoints, ..
730 } => serde_json::to_string(&broker_endpoints).unwrap(),
731 }
732 }
733
734 fn incr_datanode_id(&self) {
735 let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
736 }
737
738 fn reset_datanode_id(&self) {
739 self.datanode_id.store(0, Ordering::Relaxed);
740 }
741
742 pub(crate) fn store_config(&self) -> StoreConfig {
743 self.store_config.clone()
744 }
745
746 fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
747 if idx >= self.server_modes.len() {
748 self.server_modes.resize(idx + 1, mode.clone());
749 }
750 self.server_modes[idx] = mode;
751 }
752
753 fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
754 self.server_modes.get(idx)
755 }
756}