1use std::collections::HashMap;
16use std::fmt::Display;
17use std::fs::OpenOptions;
18use std::io;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::process::{Child, Command};
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use async_trait::async_trait;
27use sqlness::{Database, EnvController, QueryContext};
28use tokio::sync::Mutex as TokioMutex;
29
30use crate::client::MultiProtocolClient;
31use crate::cmd::bare::ServerAddr;
32use crate::formatter::{ErrorFormatter, MysqlFormatter, OutputFormatter, PostgresqlFormatter};
33use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
34use crate::server_mode::ServerMode;
35use crate::util;
36use crate::util::{PROGRAM, get_workspace_root, maybe_pull_binary};
37
38const SERVER_MODE_STANDALONE_IDX: usize = 0;
40const SERVER_MODE_METASRV_IDX: usize = 0;
42const SERVER_MODE_DATANODE_START_IDX: usize = 1;
43const SERVER_MODE_FRONTEND_IDX: usize = 4;
44const SERVER_MODE_FLOWNODE_IDX: usize = 5;
45
46#[derive(Clone)]
47pub enum WalConfig {
48 RaftEngine,
49 Kafka {
50 needs_kafka_cluster: bool,
53 broker_endpoints: Vec<String>,
54 },
55}
56
57#[derive(Debug, Clone)]
58pub(crate) enum ServiceProvider {
59 Create,
60 External(String),
61}
62
63impl From<&str> for ServiceProvider {
64 fn from(value: &str) -> Self {
65 if value.is_empty() {
66 Self::Create
67 } else {
68 Self::External(value.to_string())
69 }
70 }
71}
72
73#[derive(Clone)]
74pub struct StoreConfig {
75 pub store_addrs: Vec<String>,
76 pub setup_etcd: bool,
77 pub(crate) setup_pg: Option<ServiceProvider>,
78 pub(crate) setup_mysql: Option<ServiceProvider>,
79 pub enable_flat_format: bool,
80}
81
82#[derive(Clone)]
83pub struct Env {
84 sqlness_home: PathBuf,
85 server_addrs: ServerAddr,
86 wal: WalConfig,
87
88 bins_dir: Arc<Mutex<Option<PathBuf>>>,
92 versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
94 pull_version_on_need: bool,
96 store_config: StoreConfig,
98 extra_args: Vec<String>,
100}
101
102#[async_trait]
103impl EnvController for Env {
104 type DB = GreptimeDB;
105
106 async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
107 if self.server_addrs.server_addr.is_some() && id > 0 {
108 panic!("Parallel test mode is not supported when server address is already set.");
109 }
110
111 unsafe {
112 std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
113 }
114 match mode {
115 "standalone" => self.start_standalone(id).await,
116 "distributed" => self.start_distributed(id).await,
117 _ => panic!("Unexpected mode: {mode}"),
118 }
119 }
120
121 async fn stop(&self, _mode: &str, mut database: Self::DB) {
123 database.stop();
124 }
125}
126
127impl Env {
128 pub fn new(
129 data_home: PathBuf,
130 server_addrs: ServerAddr,
131 wal: WalConfig,
132 pull_version_on_need: bool,
133 bins_dir: Option<PathBuf>,
134 store_config: StoreConfig,
135 extra_args: Vec<String>,
136 ) -> Self {
137 Self {
138 sqlness_home: data_home,
139 server_addrs,
140 wal,
141 pull_version_on_need,
142 bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
143 versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
144 "latest".to_string(),
145 bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
146 )]))),
147 store_config,
148 extra_args,
149 }
150 }
151
152 async fn start_standalone(&self, id: usize) -> GreptimeDB {
153 println!("Starting standalone instance id: {id}");
154
155 if self.server_addrs.server_addr.is_some() {
156 self.connect_db(&self.server_addrs, id).await
157 } else {
158 self.build_db();
159 self.setup_wal();
160 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
161
162 let server_mode = ServerMode::random_standalone();
163 db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
164 let server_addr = server_mode.server_addr().unwrap();
165 let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
166
167 let mut greptimedb = self.connect_db(&server_addr, id).await;
168 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
169 greptimedb.is_standalone = true;
170 greptimedb.ctx = db_ctx;
171
172 greptimedb
173 }
174 }
175
176 async fn start_distributed(&self, id: usize) -> GreptimeDB {
177 if self.server_addrs.server_addr.is_some() {
178 self.connect_db(&self.server_addrs, id).await
179 } else {
180 self.build_db();
181 self.setup_wal();
182 self.setup_etcd();
183 self.setup_pg();
184 self.setup_mysql().await;
185 let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
186
187 let meta_server_mode = ServerMode::random_metasrv();
189 let metasrv_port = match &meta_server_mode {
190 ServerMode::Metasrv {
191 rpc_server_addr, ..
192 } => rpc_server_addr
193 .split(':')
194 .nth(1)
195 .unwrap()
196 .parse::<u16>()
197 .unwrap(),
198 _ => panic!(
199 "metasrv mode not set, maybe running in remote mode which doesn't support restart?"
200 ),
201 };
202 db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
203 let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
204
205 let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
206 db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
207 let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
208 let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
209 db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
210 let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
211 let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
212 db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
213 let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
214
215 let frontend_mode = ServerMode::random_frontend(metasrv_port);
216 let server_addr = frontend_mode.server_addr().unwrap();
217 db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
218 let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
219
220 let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
221 db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
222 let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
223
224 let mut greptimedb = self.connect_db(&server_addr, id).await;
225
226 greptimedb.metasrv_process = Some(meta_server).into();
227 greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
228 datanode_1, datanode_2, datanode_3,
229 ])));
230 greptimedb.frontend_process = Some(frontend).into();
231 greptimedb.flownode_process = Some(flownode).into();
232 greptimedb.is_standalone = false;
233 greptimedb.ctx = db_ctx;
234
235 greptimedb
236 }
237 }
238
239 async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
240 let grpc_server_addr = server_addr.server_addr.as_ref().unwrap();
241 let pg_server_addr = server_addr.pg_server_addr.as_ref().unwrap();
242 let mysql_server_addr = server_addr.mysql_server_addr.as_ref().unwrap();
243
244 let client =
245 MultiProtocolClient::connect(grpc_server_addr, pg_server_addr, mysql_server_addr).await;
246 GreptimeDB {
247 client: TokioMutex::new(client),
248 server_processes: None,
249 metasrv_process: None.into(),
250 frontend_process: None.into(),
251 flownode_process: None.into(),
252 active_bins_dir: Mutex::new(self.bins_dir.lock().unwrap().clone()),
253 ctx: GreptimeDBContext {
254 time: 0,
255 datanode_id: Default::default(),
256 wal: self.wal.clone(),
257 store_config: self.store_config.clone(),
258 server_modes: Vec::new(),
259 },
260 is_standalone: false,
261 env: self.clone(),
262 id,
263 }
264 }
265
266 fn stop_server(process: &mut Child) {
267 let _ = process.kill();
268 let _ = process.wait();
269 }
270
271 async fn start_server(
272 &self,
273 mode: ServerMode,
274 db_ctx: &GreptimeDBContext,
275 id: usize,
276 truncate_log: bool,
277 ) -> Child {
278 let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
279 "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
280 );
281
282 self.start_server_with_bins_dir(mode, db_ctx, id, truncate_log, bins_dir)
283 .await
284 }
285
286 async fn start_server_with_bins_dir(
287 &self,
288 mode: ServerMode,
289 db_ctx: &GreptimeDBContext,
290 id: usize,
291 truncate_log: bool,
292 bins_dir: PathBuf,
293 ) -> Child {
294 let log_file_name = match mode {
295 ServerMode::Datanode { node_id, .. } => {
296 db_ctx.incr_datanode_id();
297 format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
298 }
299 ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
300 ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
301 ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
302 ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
303 };
304 let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
305
306 println!("DB instance {id} log file at {stdout_file_name}");
307
308 let stdout_file = OpenOptions::new()
309 .create(true)
310 .write(true)
311 .truncate(truncate_log)
312 .append(!truncate_log)
313 .open(&stdout_file_name)
314 .unwrap();
315
316 let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
317 let check_ip_addrs = mode.check_addrs();
318
319 for check_ip_addr in &check_ip_addrs {
320 if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
321 panic!(
322 "Port {check_ip_addr} is already in use, please check and retry.",
323 check_ip_addr = check_ip_addr
324 );
325 }
326 }
327
328 let program = PROGRAM;
329
330 let abs_bins_dir = bins_dir
331 .canonicalize()
332 .expect("Failed to canonicalize bins_dir");
333
334 let mut process = Command::new(abs_bins_dir.join(program))
335 .current_dir(bins_dir.clone())
336 .env("TZ", "UTC")
337 .args(args)
338 .stdout(stdout_file)
339 .spawn()
340 .unwrap_or_else(|error| {
341 panic!(
342 "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
343 mode.name(),
344 bins_dir.join(program)
345 );
346 });
347
348 for check_ip_addr in &check_ip_addrs {
349 if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(30)).await {
350 Env::stop_server(&mut process);
351 panic!(
352 "{} doesn't up in 30 seconds, check {} for more details.",
353 mode.name(),
354 stdout_file_name
355 )
356 }
357 }
358
359 process
360 }
361
362 async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
364 let bins_dir = db.active_bins_dir.lock().unwrap().clone().expect(
365 "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
366 );
367
368 {
369 if let Some(server_process) = db.server_processes.clone() {
370 let mut server_processes = server_process.lock().unwrap();
371 for server_process in server_processes.iter_mut() {
372 Env::stop_server(server_process);
373 }
374 }
375
376 if is_full_restart {
377 if let Some(mut metasrv_process) =
378 db.metasrv_process.lock().expect("poisoned lock").take()
379 {
380 Env::stop_server(&mut metasrv_process);
381 }
382 if let Some(mut frontend_process) =
383 db.frontend_process.lock().expect("poisoned lock").take()
384 {
385 Env::stop_server(&mut frontend_process);
386 }
387 }
388
389 if let Some(mut flownode_process) =
390 db.flownode_process.lock().expect("poisoned lock").take()
391 {
392 Env::stop_server(&mut flownode_process);
393 }
394 }
395
396 let new_server_processes = if db.is_standalone {
398 let server_mode = db
399 .ctx
400 .get_server_mode(SERVER_MODE_STANDALONE_IDX)
401 .cloned()
402 .unwrap();
403 let server_addr = server_mode.server_addr().unwrap();
404 let new_server_process = self
405 .start_server_with_bins_dir(server_mode, &db.ctx, db.id, false, bins_dir.clone())
406 .await;
407
408 let mut client = db.client.lock().await;
409 client
410 .reconnect_mysql_client(&server_addr.mysql_server_addr.unwrap())
411 .await;
412 client
413 .reconnect_pg_client(&server_addr.pg_server_addr.unwrap())
414 .await;
415 vec![new_server_process]
416 } else {
417 db.ctx.reset_datanode_id();
418 if is_full_restart {
419 let metasrv_mode = db
420 .ctx
421 .get_server_mode(SERVER_MODE_METASRV_IDX)
422 .cloned()
423 .unwrap();
424 let metasrv = self
425 .start_server_with_bins_dir(
426 metasrv_mode,
427 &db.ctx,
428 db.id,
429 false,
430 bins_dir.clone(),
431 )
432 .await;
433 db.metasrv_process
434 .lock()
435 .expect("lock poisoned")
436 .replace(metasrv);
437
438 tokio::time::sleep(Duration::from_secs(5)).await;
441 }
442
443 let mut processes = vec![];
444 for i in 0..3 {
445 let datanode_mode = db
446 .ctx
447 .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
448 .cloned()
449 .unwrap();
450 let new_server_process = self
451 .start_server_with_bins_dir(
452 datanode_mode,
453 &db.ctx,
454 db.id,
455 false,
456 bins_dir.clone(),
457 )
458 .await;
459 processes.push(new_server_process);
460 }
461
462 if is_full_restart {
463 let frontend_mode = db
464 .ctx
465 .get_server_mode(SERVER_MODE_FRONTEND_IDX)
466 .cloned()
467 .unwrap();
468 let frontend = self
469 .start_server_with_bins_dir(
470 frontend_mode,
471 &db.ctx,
472 db.id,
473 false,
474 bins_dir.clone(),
475 )
476 .await;
477 db.frontend_process
478 .lock()
479 .expect("lock poisoned")
480 .replace(frontend);
481 }
482
483 let flownode_mode = db
484 .ctx
485 .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
486 .cloned()
487 .unwrap();
488 let flownode = self
489 .start_server_with_bins_dir(flownode_mode, &db.ctx, db.id, false, bins_dir.clone())
490 .await;
491 db.flownode_process
492 .lock()
493 .expect("lock poisoned")
494 .replace(flownode);
495
496 processes
497 };
498
499 if let Some(server_processes) = db.server_processes.clone() {
500 let mut server_processes = server_processes.lock().unwrap();
501 *server_processes = new_server_processes;
502 }
503 }
504
505 fn setup_wal(&self) {
507 if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
508 util::setup_wal();
509 }
510 }
511
512 fn setup_etcd(&self) {
514 if self.store_config.setup_etcd {
515 let client_ports = self
516 .store_config
517 .store_addrs
518 .iter()
519 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
520 .collect::<Vec<_>>();
521 util::setup_etcd(client_ports, None, None);
522 }
523 }
524
525 fn setup_pg(&self) {
527 if matches!(self.store_config.setup_pg, Some(ServiceProvider::Create)) {
528 let client_ports = self
529 .store_config
530 .store_addrs
531 .iter()
532 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
533 .collect::<Vec<_>>();
534 let client_port = client_ports.first().unwrap_or(&5432);
535 util::setup_pg(*client_port, None);
536 }
537 }
538
539 async fn setup_mysql(&self) {
541 if matches!(self.store_config.setup_mysql, Some(ServiceProvider::Create)) {
542 let client_ports = self
543 .store_config
544 .store_addrs
545 .iter()
546 .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
547 .collect::<Vec<_>>();
548 let client_port = client_ports.first().unwrap_or(&3306);
549 util::setup_mysql(*client_port, None);
550
551 tokio::time::sleep(Duration::from_secs(10)).await;
553 }
554 }
555
556 fn build_db(&self) {
558 let mut bins_dir = self.bins_dir.lock().unwrap();
559 if bins_dir.is_some() {
560 return;
561 }
562
563 println!("Going to build the DB...");
564 let output = Command::new("cargo")
565 .current_dir(util::get_workspace_root())
566 .args([
567 "build",
568 "--bin",
569 "greptime",
570 "--features",
571 "pg_kvbackend,mysql_kvbackend,vector_index",
572 ])
573 .output()
574 .expect("Failed to start GreptimeDB");
575 if !output.status.success() {
576 println!("Failed to build GreptimeDB, {}", output.status);
577 println!("Cargo build stdout:");
578 io::stdout().write_all(&output.stdout).unwrap();
579 println!("Cargo build stderr:");
580 io::stderr().write_all(&output.stderr).unwrap();
581 panic!();
582 }
583
584 bins_dir.replace(util::get_binary_dir("debug"));
585 }
586
587 pub(crate) fn extra_args(&self) -> &Vec<String> {
588 &self.extra_args
589 }
590}
591
592pub struct GreptimeDB {
593 server_processes: Option<Arc<Mutex<Vec<Child>>>>,
594 metasrv_process: Mutex<Option<Child>>,
595 frontend_process: Mutex<Option<Child>>,
596 flownode_process: Mutex<Option<Child>>,
597 client: TokioMutex<MultiProtocolClient>,
598 active_bins_dir: Mutex<Option<PathBuf>>,
599 ctx: GreptimeDBContext,
600 is_standalone: bool,
601 env: Env,
602 id: usize,
603}
604
605impl GreptimeDB {
606 async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
607 let mut client = self.client.lock().await;
608
609 match client.postgres_query(&query).await {
610 Ok(rows) => Box::new(PostgresqlFormatter::from(rows)),
611 Err(e) => Box::new(e),
612 }
613 }
614
615 async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
616 let mut client = self.client.lock().await;
617
618 match client.mysql_query(&query).await {
619 Ok(res) => Box::new(MysqlFormatter::from(res)),
620 Err(e) => Box::new(e),
621 }
622 }
623
624 async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
625 let mut client = self.client.lock().await;
626
627 match client.grpc_query(&query).await {
628 Ok(rows) => Box::new(OutputFormatter::from(rows)),
629 Err(e) => Box::new(ErrorFormatter::from(e)),
630 }
631 }
632}
633
634#[async_trait]
635impl Database for GreptimeDB {
636 async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
637 if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
638 self.env.restart_server(self, false).await;
639 } else if let Some(version) = ctx.context.get("version") {
640 let version_bin_dir = self
641 .env
642 .versioned_bins_dirs
643 .lock()
644 .expect("lock poison")
645 .get(version.as_str())
646 .cloned();
647
648 match version_bin_dir {
649 Some(path) if path.join(PROGRAM).is_file() => {
650 *self.active_bins_dir.lock().unwrap() = Some(path);
652 }
653 _ => {
654 maybe_pull_binary(version, self.env.pull_version_on_need).await;
656 let root = get_workspace_root();
657 let new_path = PathBuf::from_iter([&root, version]);
658 *self.active_bins_dir.lock().unwrap() = Some(new_path);
659 }
660 }
661
662 self.env.restart_server(self, true).await;
663 tokio::time::sleep(Duration::from_secs(5)).await;
665 }
666
667 if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
668 if protocol == MYSQL {
670 self.mysql_query(ctx, query).await
671 } else {
672 self.postgres_query(ctx, query).await
673 }
674 } else {
675 self.grpc_query(ctx, query).await
676 }
677 }
678}
679
680impl GreptimeDB {
681 fn stop(&mut self) {
682 if let Some(server_processes) = self.server_processes.clone() {
683 let mut server_processes = server_processes.lock().unwrap();
684 for mut server_process in server_processes.drain(..) {
685 Env::stop_server(&mut server_process);
686 println!(
687 "Standalone or Datanode (pid = {}) is stopped",
688 server_process.id()
689 );
690 }
691 }
692 if let Some(mut metasrv) = self
693 .metasrv_process
694 .lock()
695 .expect("someone else panic when holding lock")
696 .take()
697 {
698 Env::stop_server(&mut metasrv);
699 println!("Metasrv (pid = {}) is stopped", metasrv.id());
700 }
701 if let Some(mut frontend) = self
702 .frontend_process
703 .lock()
704 .expect("someone else panic when holding lock")
705 .take()
706 {
707 Env::stop_server(&mut frontend);
708 println!("Frontend (pid = {}) is stopped", frontend.id());
709 }
710 if let Some(mut flownode) = self
711 .flownode_process
712 .lock()
713 .expect("someone else panic when holding lock")
714 .take()
715 {
716 Env::stop_server(&mut flownode);
717 println!("Flownode (pid = {}) is stopped", flownode.id());
718 }
719 if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
720 {
721 util::teardown_wal();
722 }
723 }
724}
725
726impl Drop for GreptimeDB {
727 fn drop(&mut self) {
728 if self.env.server_addrs.server_addr.is_none() {
729 self.stop();
730 }
731 }
732}
733
734pub struct GreptimeDBContext {
735 time: i64,
737 datanode_id: AtomicU32,
738 wal: WalConfig,
739 store_config: StoreConfig,
740 server_modes: Vec<ServerMode>,
741}
742
743impl GreptimeDBContext {
744 pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
745 Self {
746 time: common_time::util::current_time_millis(),
747 datanode_id: AtomicU32::new(0),
748 wal,
749 store_config,
750 server_modes: Vec::new(),
751 }
752 }
753
754 pub(crate) fn time(&self) -> i64 {
755 self.time
756 }
757
758 pub fn is_raft_engine(&self) -> bool {
759 matches!(self.wal, WalConfig::RaftEngine)
760 }
761
762 pub fn kafka_wal_broker_endpoints(&self) -> String {
763 match &self.wal {
764 WalConfig::RaftEngine => String::new(),
765 WalConfig::Kafka {
766 broker_endpoints, ..
767 } => serde_json::to_string(&broker_endpoints).unwrap(),
768 }
769 }
770
771 fn incr_datanode_id(&self) {
772 let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
773 }
774
775 fn reset_datanode_id(&self) {
776 self.datanode_id.store(0, Ordering::Relaxed);
777 }
778
779 pub(crate) fn store_config(&self) -> StoreConfig {
780 self.store_config.clone()
781 }
782
783 fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
784 if idx >= self.server_modes.len() {
785 self.server_modes.resize(idx + 1, mode.clone());
786 }
787 self.server_modes[idx] = mode;
788 }
789
790 fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
791 self.server_modes.get(idx)
792 }
793}