sqlness_runner/env/
bare.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::fs::OpenOptions;
18use std::io;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::process::{Child, Command};
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, Mutex};
24use std::time::Duration;
25
26use async_trait::async_trait;
27use sqlness::{Database, EnvController, QueryContext};
28use tokio::sync::Mutex as TokioMutex;
29
30use crate::client::MultiProtocolClient;
31use crate::cmd::bare::ServerAddr;
32use crate::formatter::{ErrorFormatter, MysqlFormatter, OutputFormatter, PostgresqlFormatter};
33use crate::protocol_interceptor::{MYSQL, PROTOCOL_KEY};
34use crate::server_mode::ServerMode;
35use crate::util;
36use crate::util::{PROGRAM, get_workspace_root, maybe_pull_binary};
37
38// standalone mode
39const SERVER_MODE_STANDALONE_IDX: usize = 0;
40// distributed mode
41const SERVER_MODE_METASRV_IDX: usize = 0;
42const SERVER_MODE_DATANODE_START_IDX: usize = 1;
43const SERVER_MODE_FRONTEND_IDX: usize = 4;
44const SERVER_MODE_FLOWNODE_IDX: usize = 5;
45
46#[derive(Clone)]
47pub enum WalConfig {
48    RaftEngine,
49    Kafka {
50        /// Indicates whether the runner needs to start a kafka cluster
51        /// (it might be available in the external system environment).
52        needs_kafka_cluster: bool,
53        broker_endpoints: Vec<String>,
54    },
55}
56
57#[derive(Debug, Clone)]
58pub(crate) enum ServiceProvider {
59    Create,
60    External(String),
61}
62
63impl From<&str> for ServiceProvider {
64    fn from(value: &str) -> Self {
65        if value.is_empty() {
66            Self::Create
67        } else {
68            Self::External(value.to_string())
69        }
70    }
71}
72
73#[derive(Clone)]
74pub struct StoreConfig {
75    pub store_addrs: Vec<String>,
76    pub setup_etcd: bool,
77    pub(crate) setup_pg: Option<ServiceProvider>,
78    pub(crate) setup_mysql: Option<ServiceProvider>,
79    pub enable_flat_format: bool,
80}
81
82#[derive(Clone)]
83pub struct Env {
84    sqlness_home: PathBuf,
85    server_addrs: ServerAddr,
86    wal: WalConfig,
87
88    /// The path to the directory that contains the pre-built GreptimeDB binary.
89    /// When running in CI, this is expected to be set.
90    /// If not set, this runner will build the GreptimeDB binary itself when needed, and set this field by then.
91    bins_dir: Arc<Mutex<Option<PathBuf>>>,
92    /// The path to the directory that contains the old pre-built GreptimeDB binaries.
93    versioned_bins_dirs: Arc<Mutex<HashMap<String, PathBuf>>>,
94    /// Pull different versions of GreptimeDB on need.
95    pull_version_on_need: bool,
96    /// Store address for metasrv metadata
97    store_config: StoreConfig,
98    /// Extra command line arguments when starting GreptimeDB binaries.
99    extra_args: Vec<String>,
100}
101
102#[async_trait]
103impl EnvController for Env {
104    type DB = GreptimeDB;
105
106    async fn start(&self, mode: &str, id: usize, _config: Option<&Path>) -> Self::DB {
107        if self.server_addrs.server_addr.is_some() && id > 0 {
108            panic!("Parallel test mode is not supported when server address is already set.");
109        }
110
111        unsafe {
112            std::env::set_var("SQLNESS_HOME", self.sqlness_home.display().to_string());
113        }
114        match mode {
115            "standalone" => self.start_standalone(id).await,
116            "distributed" => self.start_distributed(id).await,
117            _ => panic!("Unexpected mode: {mode}"),
118        }
119    }
120
121    /// Stop one [`Database`].
122    async fn stop(&self, _mode: &str, mut database: Self::DB) {
123        database.stop();
124    }
125}
126
127impl Env {
128    pub fn new(
129        data_home: PathBuf,
130        server_addrs: ServerAddr,
131        wal: WalConfig,
132        pull_version_on_need: bool,
133        bins_dir: Option<PathBuf>,
134        store_config: StoreConfig,
135        extra_args: Vec<String>,
136    ) -> Self {
137        Self {
138            sqlness_home: data_home,
139            server_addrs,
140            wal,
141            pull_version_on_need,
142            bins_dir: Arc::new(Mutex::new(bins_dir.clone())),
143            versioned_bins_dirs: Arc::new(Mutex::new(HashMap::from_iter([(
144                "latest".to_string(),
145                bins_dir.clone().unwrap_or(util::get_binary_dir("debug")),
146            )]))),
147            store_config,
148            extra_args,
149        }
150    }
151
152    async fn start_standalone(&self, id: usize) -> GreptimeDB {
153        println!("Starting standalone instance id: {id}");
154
155        if self.server_addrs.server_addr.is_some() {
156            self.connect_db(&self.server_addrs, id).await
157        } else {
158            self.build_db();
159            self.setup_wal();
160            let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
161
162            let server_mode = ServerMode::random_standalone();
163            db_ctx.set_server_mode(server_mode.clone(), SERVER_MODE_STANDALONE_IDX);
164            let server_addr = server_mode.server_addr().unwrap();
165            let server_process = self.start_server(server_mode, &db_ctx, id, true).await;
166
167            let mut greptimedb = self.connect_db(&server_addr, id).await;
168            greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![server_process])));
169            greptimedb.is_standalone = true;
170            greptimedb.ctx = db_ctx;
171
172            greptimedb
173        }
174    }
175
176    async fn start_distributed(&self, id: usize) -> GreptimeDB {
177        if self.server_addrs.server_addr.is_some() {
178            self.connect_db(&self.server_addrs, id).await
179        } else {
180            self.build_db();
181            self.setup_wal();
182            self.setup_etcd();
183            self.setup_pg();
184            self.setup_mysql().await;
185            let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
186
187            // start a distributed GreptimeDB
188            let meta_server_mode = ServerMode::random_metasrv();
189            let metasrv_port = match &meta_server_mode {
190                ServerMode::Metasrv {
191                    rpc_server_addr, ..
192                } => rpc_server_addr
193                    .split(':')
194                    .nth(1)
195                    .unwrap()
196                    .parse::<u16>()
197                    .unwrap(),
198                _ => panic!(
199                    "metasrv mode not set, maybe running in remote mode which doesn't support restart?"
200                ),
201            };
202            db_ctx.set_server_mode(meta_server_mode.clone(), SERVER_MODE_METASRV_IDX);
203            let meta_server = self.start_server(meta_server_mode, &db_ctx, id, true).await;
204
205            let datanode_1_mode = ServerMode::random_datanode(metasrv_port, 0);
206            db_ctx.set_server_mode(datanode_1_mode.clone(), SERVER_MODE_DATANODE_START_IDX);
207            let datanode_1 = self.start_server(datanode_1_mode, &db_ctx, id, true).await;
208            let datanode_2_mode = ServerMode::random_datanode(metasrv_port, 1);
209            db_ctx.set_server_mode(datanode_2_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 1);
210            let datanode_2 = self.start_server(datanode_2_mode, &db_ctx, id, true).await;
211            let datanode_3_mode = ServerMode::random_datanode(metasrv_port, 2);
212            db_ctx.set_server_mode(datanode_3_mode.clone(), SERVER_MODE_DATANODE_START_IDX + 2);
213            let datanode_3 = self.start_server(datanode_3_mode, &db_ctx, id, true).await;
214
215            let frontend_mode = ServerMode::random_frontend(metasrv_port);
216            let server_addr = frontend_mode.server_addr().unwrap();
217            db_ctx.set_server_mode(frontend_mode.clone(), SERVER_MODE_FRONTEND_IDX);
218            let frontend = self.start_server(frontend_mode, &db_ctx, id, true).await;
219
220            let flownode_mode = ServerMode::random_flownode(metasrv_port, 0);
221            db_ctx.set_server_mode(flownode_mode.clone(), SERVER_MODE_FLOWNODE_IDX);
222            let flownode = self.start_server(flownode_mode, &db_ctx, id, true).await;
223
224            let mut greptimedb = self.connect_db(&server_addr, id).await;
225
226            greptimedb.metasrv_process = Some(meta_server).into();
227            greptimedb.server_processes = Some(Arc::new(Mutex::new(vec![
228                datanode_1, datanode_2, datanode_3,
229            ])));
230            greptimedb.frontend_process = Some(frontend).into();
231            greptimedb.flownode_process = Some(flownode).into();
232            greptimedb.is_standalone = false;
233            greptimedb.ctx = db_ctx;
234
235            greptimedb
236        }
237    }
238
239    async fn connect_db(&self, server_addr: &ServerAddr, id: usize) -> GreptimeDB {
240        let grpc_server_addr = server_addr.server_addr.as_ref().unwrap();
241        let pg_server_addr = server_addr.pg_server_addr.as_ref().unwrap();
242        let mysql_server_addr = server_addr.mysql_server_addr.as_ref().unwrap();
243
244        let client =
245            MultiProtocolClient::connect(grpc_server_addr, pg_server_addr, mysql_server_addr).await;
246        GreptimeDB {
247            client: TokioMutex::new(client),
248            server_processes: None,
249            metasrv_process: None.into(),
250            frontend_process: None.into(),
251            flownode_process: None.into(),
252            ctx: GreptimeDBContext {
253                time: 0,
254                datanode_id: Default::default(),
255                wal: self.wal.clone(),
256                store_config: self.store_config.clone(),
257                server_modes: Vec::new(),
258            },
259            is_standalone: false,
260            env: self.clone(),
261            id,
262        }
263    }
264
265    fn stop_server(process: &mut Child) {
266        let _ = process.kill();
267        let _ = process.wait();
268    }
269
270    async fn start_server(
271        &self,
272        mode: ServerMode,
273        db_ctx: &GreptimeDBContext,
274        id: usize,
275        truncate_log: bool,
276    ) -> Child {
277        let log_file_name = match mode {
278            ServerMode::Datanode { node_id, .. } => {
279                db_ctx.incr_datanode_id();
280                format!("greptime-{}-sqlness-datanode-{}.log", id, node_id)
281            }
282            ServerMode::Flownode { .. } => format!("greptime-{}-sqlness-flownode.log", id),
283            ServerMode::Frontend { .. } => format!("greptime-{}-sqlness-frontend.log", id),
284            ServerMode::Metasrv { .. } => format!("greptime-{}-sqlness-metasrv.log", id),
285            ServerMode::Standalone { .. } => format!("greptime-{}-sqlness-standalone.log", id),
286        };
287        let stdout_file_name = self.sqlness_home.join(log_file_name).display().to_string();
288
289        println!("DB instance {id} log file at {stdout_file_name}");
290
291        let stdout_file = OpenOptions::new()
292            .create(true)
293            .write(true)
294            .truncate(truncate_log)
295            .append(!truncate_log)
296            .open(&stdout_file_name)
297            .unwrap();
298
299        let args = mode.get_args(&self.sqlness_home, self, db_ctx, id);
300        let check_ip_addrs = mode.check_addrs();
301
302        for check_ip_addr in &check_ip_addrs {
303            if util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(1)).await {
304                panic!(
305                    "Port {check_ip_addr} is already in use, please check and retry.",
306                    check_ip_addr = check_ip_addr
307                );
308            }
309        }
310
311        let program = PROGRAM;
312
313        let bins_dir = self.bins_dir.lock().unwrap().clone().expect(
314            "GreptimeDB binary is not available. Please pass in the path to the directory that contains the pre-built GreptimeDB binary. Or you may call `self.build_db()` beforehand.",
315        );
316
317        let abs_bins_dir = bins_dir
318            .canonicalize()
319            .expect("Failed to canonicalize bins_dir");
320
321        let mut process = Command::new(abs_bins_dir.join(program))
322            .current_dir(bins_dir.clone())
323            .env("TZ", "UTC")
324            .args(args)
325            .stdout(stdout_file)
326            .spawn()
327            .unwrap_or_else(|error| {
328                panic!(
329                    "Failed to start the DB with subcommand {}, Error: {error}, path: {:?}",
330                    mode.name(),
331                    bins_dir.join(program)
332                );
333            });
334
335        for check_ip_addr in &check_ip_addrs {
336            if !util::check_port(check_ip_addr.parse().unwrap(), Duration::from_secs(30)).await {
337                Env::stop_server(&mut process);
338                panic!(
339                    "{} doesn't up in 30 seconds, check {} for more details.",
340                    mode.name(),
341                    stdout_file_name
342                )
343            }
344        }
345
346        process
347    }
348
349    /// stop and restart the server process
350    async fn restart_server(&self, db: &GreptimeDB, is_full_restart: bool) {
351        {
352            if let Some(server_process) = db.server_processes.clone() {
353                let mut server_processes = server_process.lock().unwrap();
354                for server_process in server_processes.iter_mut() {
355                    Env::stop_server(server_process);
356                }
357            }
358
359            if is_full_restart {
360                if let Some(mut metasrv_process) =
361                    db.metasrv_process.lock().expect("poisoned lock").take()
362                {
363                    Env::stop_server(&mut metasrv_process);
364                }
365                if let Some(mut frontend_process) =
366                    db.frontend_process.lock().expect("poisoned lock").take()
367                {
368                    Env::stop_server(&mut frontend_process);
369                }
370            }
371
372            if let Some(mut flownode_process) =
373                db.flownode_process.lock().expect("poisoned lock").take()
374            {
375                Env::stop_server(&mut flownode_process);
376            }
377        }
378
379        // check if the server is distributed or standalone
380        let new_server_processes = if db.is_standalone {
381            let server_mode = db
382                .ctx
383                .get_server_mode(SERVER_MODE_STANDALONE_IDX)
384                .cloned()
385                .unwrap();
386            let server_addr = server_mode.server_addr().unwrap();
387            let new_server_process = self.start_server(server_mode, &db.ctx, db.id, false).await;
388
389            let mut client = db.client.lock().await;
390            client
391                .reconnect_mysql_client(&server_addr.mysql_server_addr.unwrap())
392                .await;
393            client
394                .reconnect_pg_client(&server_addr.pg_server_addr.unwrap())
395                .await;
396            vec![new_server_process]
397        } else {
398            db.ctx.reset_datanode_id();
399            if is_full_restart {
400                let metasrv_mode = db
401                    .ctx
402                    .get_server_mode(SERVER_MODE_METASRV_IDX)
403                    .cloned()
404                    .unwrap();
405                let metasrv = self.start_server(metasrv_mode, &db.ctx, db.id, false).await;
406                db.metasrv_process
407                    .lock()
408                    .expect("lock poisoned")
409                    .replace(metasrv);
410
411                // wait for metasrv to start
412                // since it seems older version of db might take longer to complete election
413                tokio::time::sleep(Duration::from_secs(5)).await;
414            }
415
416            let mut processes = vec![];
417            for i in 0..3 {
418                let datanode_mode = db
419                    .ctx
420                    .get_server_mode(SERVER_MODE_DATANODE_START_IDX + i)
421                    .cloned()
422                    .unwrap();
423                let new_server_process = self
424                    .start_server(datanode_mode, &db.ctx, db.id, false)
425                    .await;
426                processes.push(new_server_process);
427            }
428
429            if is_full_restart {
430                let frontend_mode = db
431                    .ctx
432                    .get_server_mode(SERVER_MODE_FRONTEND_IDX)
433                    .cloned()
434                    .unwrap();
435                let frontend = self
436                    .start_server(frontend_mode, &db.ctx, db.id, false)
437                    .await;
438                db.frontend_process
439                    .lock()
440                    .expect("lock poisoned")
441                    .replace(frontend);
442            }
443
444            let flownode_mode = db
445                .ctx
446                .get_server_mode(SERVER_MODE_FLOWNODE_IDX)
447                .cloned()
448                .unwrap();
449            let flownode = self
450                .start_server(flownode_mode, &db.ctx, db.id, false)
451                .await;
452            db.flownode_process
453                .lock()
454                .expect("lock poisoned")
455                .replace(flownode);
456
457            processes
458        };
459
460        if let Some(server_processes) = db.server_processes.clone() {
461            let mut server_processes = server_processes.lock().unwrap();
462            *server_processes = new_server_processes;
463        }
464    }
465
466    /// Setup kafka wal cluster if needed. The counterpart is in [GreptimeDB::stop].
467    fn setup_wal(&self) {
468        if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
469            util::setup_wal();
470        }
471    }
472
473    /// Setup etcd if needed.
474    fn setup_etcd(&self) {
475        if self.store_config.setup_etcd {
476            let client_ports = self
477                .store_config
478                .store_addrs
479                .iter()
480                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
481                .collect::<Vec<_>>();
482            util::setup_etcd(client_ports, None, None);
483        }
484    }
485
486    /// Setup PostgreSql if needed.
487    fn setup_pg(&self) {
488        if matches!(self.store_config.setup_pg, Some(ServiceProvider::Create)) {
489            let client_ports = self
490                .store_config
491                .store_addrs
492                .iter()
493                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
494                .collect::<Vec<_>>();
495            let client_port = client_ports.first().unwrap_or(&5432);
496            util::setup_pg(*client_port, None);
497        }
498    }
499
500    /// Setup MySql if needed.
501    async fn setup_mysql(&self) {
502        if matches!(self.store_config.setup_mysql, Some(ServiceProvider::Create)) {
503            let client_ports = self
504                .store_config
505                .store_addrs
506                .iter()
507                .map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
508                .collect::<Vec<_>>();
509            let client_port = client_ports.first().unwrap_or(&3306);
510            util::setup_mysql(*client_port, None);
511
512            // Docker of MySQL starts slowly, so we need to wait for a while
513            tokio::time::sleep(Duration::from_secs(10)).await;
514        }
515    }
516
517    /// Build the DB with `cargo build --bin greptime`
518    fn build_db(&self) {
519        if self.bins_dir.lock().unwrap().is_some() {
520            return;
521        }
522
523        println!("Going to build the DB...");
524        let output = Command::new("cargo")
525            .current_dir(util::get_workspace_root())
526            .args([
527                "build",
528                "--bin",
529                "greptime",
530                "--features",
531                "pg_kvbackend,mysql_kvbackend",
532            ])
533            .output()
534            .expect("Failed to start GreptimeDB");
535        if !output.status.success() {
536            println!("Failed to build GreptimeDB, {}", output.status);
537            println!("Cargo build stdout:");
538            io::stdout().write_all(&output.stdout).unwrap();
539            println!("Cargo build stderr:");
540            io::stderr().write_all(&output.stderr).unwrap();
541            panic!();
542        }
543
544        let _ = self
545            .bins_dir
546            .lock()
547            .unwrap()
548            .insert(util::get_binary_dir("debug"));
549    }
550
551    pub(crate) fn extra_args(&self) -> &Vec<String> {
552        &self.extra_args
553    }
554}
555
556pub struct GreptimeDB {
557    server_processes: Option<Arc<Mutex<Vec<Child>>>>,
558    metasrv_process: Mutex<Option<Child>>,
559    frontend_process: Mutex<Option<Child>>,
560    flownode_process: Mutex<Option<Child>>,
561    client: TokioMutex<MultiProtocolClient>,
562    ctx: GreptimeDBContext,
563    is_standalone: bool,
564    env: Env,
565    id: usize,
566}
567
568impl GreptimeDB {
569    async fn postgres_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
570        let mut client = self.client.lock().await;
571
572        match client.postgres_query(&query).await {
573            Ok(rows) => Box::new(PostgresqlFormatter::from(rows)),
574            Err(e) => Box::new(e),
575        }
576    }
577
578    async fn mysql_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
579        let mut client = self.client.lock().await;
580
581        match client.mysql_query(&query).await {
582            Ok(res) => Box::new(MysqlFormatter::from(res)),
583            Err(e) => Box::new(e),
584        }
585    }
586
587    async fn grpc_query(&self, _ctx: QueryContext, query: String) -> Box<dyn Display> {
588        let mut client = self.client.lock().await;
589
590        match client.grpc_query(&query).await {
591            Ok(rows) => Box::new(OutputFormatter::from(rows)),
592            Err(e) => Box::new(ErrorFormatter::from(e)),
593        }
594    }
595}
596
597#[async_trait]
598impl Database for GreptimeDB {
599    async fn query(&self, ctx: QueryContext, query: String) -> Box<dyn Display> {
600        if ctx.context.contains_key("restart") && self.env.server_addrs.server_addr.is_none() {
601            self.env.restart_server(self, false).await;
602        } else if let Some(version) = ctx.context.get("version") {
603            let version_bin_dir = self
604                .env
605                .versioned_bins_dirs
606                .lock()
607                .expect("lock poison")
608                .get(version.as_str())
609                .cloned();
610
611            match version_bin_dir {
612                Some(path) if path.clone().join(PROGRAM).is_file() => {
613                    // use version in versioned_bins_dirs
614                    *self.env.bins_dir.lock().unwrap() = Some(path.clone());
615                }
616                _ => {
617                    // use version in dir files
618                    maybe_pull_binary(version, self.env.pull_version_on_need).await;
619                    let root = get_workspace_root();
620                    let new_path = PathBuf::from_iter([&root, version]);
621                    *self.env.bins_dir.lock().unwrap() = Some(new_path);
622                }
623            }
624
625            self.env.restart_server(self, true).await;
626            // sleep for a while to wait for the server to fully boot up
627            tokio::time::sleep(Duration::from_secs(5)).await;
628        }
629
630        if let Some(protocol) = ctx.context.get(PROTOCOL_KEY) {
631            // protocol is bound to be either "mysql" or "postgres"
632            if protocol == MYSQL {
633                self.mysql_query(ctx, query).await
634            } else {
635                self.postgres_query(ctx, query).await
636            }
637        } else {
638            self.grpc_query(ctx, query).await
639        }
640    }
641}
642
643impl GreptimeDB {
644    fn stop(&mut self) {
645        if let Some(server_processes) = self.server_processes.clone() {
646            let mut server_processes = server_processes.lock().unwrap();
647            for mut server_process in server_processes.drain(..) {
648                Env::stop_server(&mut server_process);
649                println!(
650                    "Standalone or Datanode (pid = {}) is stopped",
651                    server_process.id()
652                );
653            }
654        }
655        if let Some(mut metasrv) = self
656            .metasrv_process
657            .lock()
658            .expect("someone else panic when holding lock")
659            .take()
660        {
661            Env::stop_server(&mut metasrv);
662            println!("Metasrv (pid = {}) is stopped", metasrv.id());
663        }
664        if let Some(mut frontend) = self
665            .frontend_process
666            .lock()
667            .expect("someone else panic when holding lock")
668            .take()
669        {
670            Env::stop_server(&mut frontend);
671            println!("Frontend (pid = {}) is stopped", frontend.id());
672        }
673        if let Some(mut flownode) = self
674            .flownode_process
675            .lock()
676            .expect("someone else panic when holding lock")
677            .take()
678        {
679            Env::stop_server(&mut flownode);
680            println!("Flownode (pid = {}) is stopped", flownode.id());
681        }
682        if matches!(self.ctx.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster)
683        {
684            util::teardown_wal();
685        }
686    }
687}
688
689impl Drop for GreptimeDB {
690    fn drop(&mut self) {
691        if self.env.server_addrs.server_addr.is_none() {
692            self.stop();
693        }
694    }
695}
696
697pub struct GreptimeDBContext {
698    /// Start time in millisecond
699    time: i64,
700    datanode_id: AtomicU32,
701    wal: WalConfig,
702    store_config: StoreConfig,
703    server_modes: Vec<ServerMode>,
704}
705
706impl GreptimeDBContext {
707    pub fn new(wal: WalConfig, store_config: StoreConfig) -> Self {
708        Self {
709            time: common_time::util::current_time_millis(),
710            datanode_id: AtomicU32::new(0),
711            wal,
712            store_config,
713            server_modes: Vec::new(),
714        }
715    }
716
717    pub(crate) fn time(&self) -> i64 {
718        self.time
719    }
720
721    pub fn is_raft_engine(&self) -> bool {
722        matches!(self.wal, WalConfig::RaftEngine)
723    }
724
725    pub fn kafka_wal_broker_endpoints(&self) -> String {
726        match &self.wal {
727            WalConfig::RaftEngine => String::new(),
728            WalConfig::Kafka {
729                broker_endpoints, ..
730            } => serde_json::to_string(&broker_endpoints).unwrap(),
731        }
732    }
733
734    fn incr_datanode_id(&self) {
735        let _ = self.datanode_id.fetch_add(1, Ordering::Relaxed);
736    }
737
738    fn reset_datanode_id(&self) {
739        self.datanode_id.store(0, Ordering::Relaxed);
740    }
741
742    pub(crate) fn store_config(&self) -> StoreConfig {
743        self.store_config.clone()
744    }
745
746    fn set_server_mode(&mut self, mode: ServerMode, idx: usize) {
747        if idx >= self.server_modes.len() {
748            self.server_modes.resize(idx + 1, mode.clone());
749        }
750        self.server_modes[idx] = mode;
751    }
752
753    fn get_server_mode(&self, idx: usize) -> Option<&ServerMode> {
754        self.server_modes.get(idx)
755    }
756}