From bb4890cff8079217d377c7125417c1d4bf26ec2d Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Sat, 3 May 2025 08:28:32 +0800 Subject: [PATCH] refactor: datanode instance builder (#6034) remove another piece of REPL codes --- Cargo.lock | 119 --------------------- src/cli/Cargo.toml | 1 - src/cli/src/cmd.rs | 154 ---------------------------- src/cli/src/error.rs | 4 - src/cli/src/helper.rs | 112 -------------------- src/cli/src/lib.rs | 10 +- src/cmd/src/bin/greptime.rs | 10 +- src/cmd/src/datanode.rs | 101 +++--------------- src/cmd/src/datanode/builder.rs | 138 +++++++++++++++++++++++++ src/cmd/src/error.rs | 4 - src/cmd/src/standalone.rs | 10 +- src/datanode/src/datanode.rs | 74 ++++++------- src/datanode/src/error.rs | 7 -- tests-integration/src/cluster.rs | 11 +- tests-integration/src/standalone.rs | 12 +-- 15 files changed, 210 insertions(+), 557 deletions(-) delete mode 100644 src/cli/src/cmd.rs delete mode 100644 src/cli/src/helper.rs create mode 100644 src/cmd/src/datanode/builder.rs diff --git a/Cargo.lock b/Cargo.lock index 5955a1cf59..de53979816 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1883,7 +1883,6 @@ dependencies = [ "query", "rand 0.9.0", "reqwest", - "rustyline", "serde", "serde_json", "servers", @@ -1936,17 +1935,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "clipboard-win" -version = "4.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7191c27c2357d9b7ef96baac1773290d4ca63b24205b82a3fd8a0637afcf0362" -dependencies = [ - "error-code", - "str-buf", - "winapi", -] - [[package]] name = "clocksource" version = "0.8.1" @@ -3968,27 +3956,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs-next" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" -dependencies = [ - "cfg-if", - "dirs-sys-next", -] - -[[package]] -name = "dirs-sys-next" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "displaydoc" version = "0.2.5" @@ -4143,12 +4110,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "endian-type" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" - [[package]] name = "enum-as-inner" version = "0.6.1" @@ -4219,16 +4180,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "error-code" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64f18991e7bf11e7ffee451b5318b5c1a73c52d0d0ada6e5a3017c8c1ced6a21" -dependencies = [ - "libc", - "str-buf", -] - [[package]] name = "etcd-client" version = "0.14.0" @@ -4336,17 +4287,6 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" -[[package]] -name = "fd-lock" -version = "3.0.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" -dependencies = [ - "cfg-if", - "rustix", - "windows-sys 0.48.0", -] - [[package]] name = "file-engine" version = "0.15.0" @@ -7528,15 +7468,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "nibble_vec" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" -dependencies = [ - "smallvec", -] - [[package]] name = "nix" version = "0.25.1" @@ -9627,16 +9558,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" -[[package]] -name = "radix_trie" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" -dependencies = [ - "endian-type", - "nibble_vec", -] - [[package]] name = "raft-engine" version = "0.4.2" @@ -9817,17 +9738,6 @@ dependencies = [ "bitflags 2.9.0", ] -[[package]] -name = "redox_users" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" -dependencies = [ - "getrandom 0.2.15", - "libredox", - "thiserror 1.0.64", -] - [[package]] name = "ref-cast" version = "1.0.23" @@ -10461,29 +10371,6 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" -[[package]] -name = "rustyline" -version = "10.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e83c32c3f3c33b08496e0d1df9ea8c64d39adb8eb36a1ebb1440c690697aef" -dependencies = [ - "bitflags 1.3.2", - "cfg-if", - "clipboard-win", - "dirs-next", - "fd-lock", - "libc", - "log", - "memchr", - "nix 0.25.1", - "radix_trie", - "scopeguard", - "unicode-segmentation", - "unicode-width", - "utf8parse", - "winapi", -] - [[package]] name = "ryu" version = "1.0.18" @@ -11704,12 +11591,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "str-buf" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e08d8363704e6c71fc928674353e6b7c23dcea9d82d7012c8faf2a3a025f8d0" - [[package]] name = "str_stack" version = "0.1.0" diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 302c3a292f..ff6f5dea13 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -51,7 +51,6 @@ opendal = { version = "0.51.1", features = [ query.workspace = true rand.workspace = true reqwest.workspace = true -rustyline = "10.1" serde.workspace = true serde_json.workspace = true servers.workspace = true diff --git a/src/cli/src/cmd.rs b/src/cli/src/cmd.rs deleted file mode 100644 index 557a02b385..0000000000 --- a/src/cli/src/cmd.rs +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::error::{Error, InvalidReplCommandSnafu, Result}; - -/// Represents the parsed command from the user (which may be over many lines) -#[derive(Debug, PartialEq)] -pub(crate) enum ReplCommand { - Help, - UseDatabase { db_name: String }, - Sql { sql: String }, - Exit, -} - -impl TryFrom<&str> for ReplCommand { - type Error = Error; - - fn try_from(input: &str) -> Result { - let input = input.trim(); - if input.is_empty() { - return InvalidReplCommandSnafu { - reason: "No command specified".to_string(), - } - .fail(); - } - - // If line ends with ';', it must be treated as a complete input. - // However, the opposite is not true. - let input_is_completed = input.ends_with(';'); - - let input = input.strip_suffix(';').map(|x| x.trim()).unwrap_or(input); - let lowercase = input.to_lowercase(); - match lowercase.as_str() { - "help" => Ok(Self::Help), - "exit" | "quit" => Ok(Self::Exit), - _ => match input.split_once(' ') { - Some((maybe_use, database)) if maybe_use.to_lowercase() == "use" => { - Ok(Self::UseDatabase { - db_name: database.trim().to_string(), - }) - } - // Any valid SQL must contains at least one whitespace. - Some(_) if input_is_completed => Ok(Self::Sql { - sql: input.to_string(), - }), - _ => InvalidReplCommandSnafu { - reason: format!("unknown command '{input}', maybe input is not completed"), - } - .fail(), - }, - } - } -} - -impl ReplCommand { - pub fn help() -> &'static str { - r#" -Available commands (case insensitive): -- 'help': print this help -- 'exit' or 'quit': exit the REPL -- 'use ': switch to another database/schema context -- Other typed in text will be treated as SQL. - You can enter new line while typing, just remember to end it with ';'. -"# - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::error::Error::InvalidReplCommand; - - #[test] - fn test_from_str() { - fn test_ok(s: &str, expected: ReplCommand) { - let actual: ReplCommand = s.try_into().unwrap(); - assert_eq!(expected, actual, "'{}'", s); - } - - fn test_err(s: &str) { - let result: Result = s.try_into(); - assert!(matches!(result, Err(InvalidReplCommand { .. }))) - } - - test_err(""); - test_err(" "); - test_err("\t"); - - test_ok("help", ReplCommand::Help); - test_ok("help", ReplCommand::Help); - test_ok(" help", ReplCommand::Help); - test_ok(" help ", ReplCommand::Help); - test_ok(" HELP ", ReplCommand::Help); - test_ok(" Help; ", ReplCommand::Help); - test_ok(" help ; ", ReplCommand::Help); - - test_ok("exit", ReplCommand::Exit); - test_ok("exit;", ReplCommand::Exit); - test_ok("exit ;", ReplCommand::Exit); - test_ok("EXIT", ReplCommand::Exit); - - test_ok("quit", ReplCommand::Exit); - test_ok("quit;", ReplCommand::Exit); - test_ok("quit ;", ReplCommand::Exit); - test_ok("QUIT", ReplCommand::Exit); - - test_ok( - "use Foo", - ReplCommand::UseDatabase { - db_name: "Foo".to_string(), - }, - ); - test_ok( - " use Foo ; ", - ReplCommand::UseDatabase { - db_name: "Foo".to_string(), - }, - ); - // ensure that database name is case sensitive - test_ok( - " use FOO ; ", - ReplCommand::UseDatabase { - db_name: "FOO".to_string(), - }, - ); - - // ensure that we aren't messing with capitalization - test_ok( - "SELECT * from foo;", - ReplCommand::Sql { - sql: "SELECT * from foo".to_string(), - }, - ); - // Input line (that don't belong to any other cases above) must ends with ';' to make it a valid SQL. - test_err("insert blah"); - test_ok( - "insert blah;", - ReplCommand::Sql { - sql: "insert blah".to_string(), - }, - ); - } -} diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 2c18531aaa..6fefbf798e 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -101,9 +101,6 @@ pub enum Error { error: reqwest::Error, }, - #[snafu(display("Invalid REPL command: {reason}"))] - InvalidReplCommand { reason: String }, - #[snafu(display("Failed to parse SQL: {}", sql))] ParseSql { sql: String, @@ -254,7 +251,6 @@ impl ErrorExt for Error { Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } - | Error::InvalidReplCommand { .. } | Error::InitTimezone { .. } | Error::ConnectEtcd { .. } | Error::CreateDir { .. } diff --git a/src/cli/src/helper.rs b/src/cli/src/helper.rs deleted file mode 100644 index ee47e0f577..0000000000 --- a/src/cli/src/helper.rs +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::borrow::Cow; - -use rustyline::completion::Completer; -use rustyline::highlight::{Highlighter, MatchingBracketHighlighter}; -use rustyline::hint::{Hinter, HistoryHinter}; -use rustyline::validate::{ValidationContext, ValidationResult, Validator}; - -use crate::cmd::ReplCommand; - -pub(crate) struct RustylineHelper { - hinter: HistoryHinter, - highlighter: MatchingBracketHighlighter, -} - -impl Default for RustylineHelper { - fn default() -> Self { - Self { - hinter: HistoryHinter {}, - highlighter: MatchingBracketHighlighter::default(), - } - } -} - -impl rustyline::Helper for RustylineHelper {} - -impl Validator for RustylineHelper { - fn validate(&self, ctx: &mut ValidationContext<'_>) -> rustyline::Result { - let input = ctx.input(); - match ReplCommand::try_from(input) { - Ok(_) => Ok(ValidationResult::Valid(None)), - Err(e) => { - if input.trim_end().ends_with(';') { - // If line ends with ';', it HAS to be a valid command. - Ok(ValidationResult::Invalid(Some(e.to_string()))) - } else { - Ok(ValidationResult::Incomplete) - } - } - } - } -} - -impl Hinter for RustylineHelper { - type Hint = String; - - fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option { - self.hinter.hint(line, pos, ctx) - } -} - -impl Highlighter for RustylineHelper { - fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> { - self.highlighter.highlight(line, pos) - } - - fn highlight_prompt<'b, 's: 'b, 'p: 'b>( - &'s self, - prompt: &'p str, - default: bool, - ) -> Cow<'b, str> { - self.highlighter.highlight_prompt(prompt, default) - } - - fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> { - use nu_ansi_term::Style; - Cow::Owned(Style::new().dimmed().paint(hint).to_string()) - } - - fn highlight_candidate<'c>( - &self, - candidate: &'c str, - completion: rustyline::CompletionType, - ) -> Cow<'c, str> { - self.highlighter.highlight_candidate(candidate, completion) - } - - fn highlight_char(&self, line: &str, pos: usize) -> bool { - self.highlighter.highlight_char(line, pos) - } -} - -impl Completer for RustylineHelper { - type Candidate = String; - - fn complete( - &self, - line: &str, - pos: usize, - ctx: &rustyline::Context<'_>, - ) -> rustyline::Result<(usize, Vec)> { - // If there is a hint, use that as the auto-complete when user hits `tab` - if let Some(hint) = self.hinter.hint(line, pos, ctx) { - Ok((pos, vec![hint])) - } else { - Ok((0, vec![])) - } - } -} diff --git a/src/cli/src/lib.rs b/src/cli/src/lib.rs index 113e88f1c1..2e63813a87 100644 --- a/src/cli/src/lib.rs +++ b/src/cli/src/lib.rs @@ -13,15 +13,9 @@ // limitations under the License. mod bench; -pub mod error; -// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 -#[allow(unused)] -mod cmd; -mod export; -mod helper; - -// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373 mod database; +pub mod error; +mod export; mod import; use async_trait::async_trait; diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 54659833fa..e57241f638 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -15,9 +15,11 @@ #![doc = include_str!("../../../../README.md")] use clap::{Parser, Subcommand}; +use cmd::datanode::builder::InstanceBuilder; use cmd::error::{InitTlsProviderSnafu, Result}; use cmd::options::GlobalOptions; use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App}; +use common_base::Plugins; use common_version::version; use servers::install_ring_crypto_provider; @@ -102,10 +104,10 @@ async fn main_body() -> Result<()> { async fn start(cli: Command) -> Result<()> { match cli.subcmd { SubCommand::Datanode(cmd) => { - cmd.build(cmd.load_options(&cli.global_options)?) - .await? - .run() - .await + let opts = cmd.load_options(&cli.global_options)?; + let plugins = Plugins::new(); + let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?; + cmd.build_with(builder).await?.run().await } SubCommand::Flownode(cmd) => { cmd.build(cmd.load_options(&cli.global_options)?) diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index af3674138c..1cd270aa4c 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -12,33 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +pub mod builder; + use std::time::Duration; use async_trait::async_trait; -use cache::build_datanode_cache_registry; -use catalog::kvbackend::MetaKvBackend; use clap::Parser; -use common_base::Plugins; use common_config::Configurable; -use common_meta::cache::LayeredCacheRegistryBuilder; use common_telemetry::logging::TracingOptions; use common_telemetry::{info, warn}; -use common_version::{short_version, version}; use common_wal::config::DatanodeWalConfig; -use datanode::datanode::{Datanode, DatanodeBuilder}; -use datanode::service::DatanodeServiceBuilder; -use meta_client::{MetaClientOptions, MetaClientType}; -use servers::Mode; -use snafu::{ensure, OptionExt, ResultExt}; +use datanode::datanode::Datanode; +use meta_client::MetaClientOptions; +use snafu::{ensure, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; +use crate::datanode::builder::InstanceBuilder; use crate::error::{ - LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, - StartDatanodeSnafu, + LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; -use crate::{log_versions, App}; +use crate::App; pub const APP_NAME: &str = "greptime-datanode"; @@ -98,8 +92,8 @@ pub struct Command { } impl Command { - pub async fn build(&self, opts: DatanodeOptions) -> Result { - self.subcmd.build(opts).await + pub async fn build_with(&self, builder: InstanceBuilder) -> Result { + self.subcmd.build_with(builder).await } pub fn load_options(&self, global_options: &GlobalOptions) -> Result { @@ -115,9 +109,12 @@ enum SubCommand { } impl SubCommand { - async fn build(&self, opts: DatanodeOptions) -> Result { + async fn build_with(&self, builder: InstanceBuilder) -> Result { match self { - SubCommand::Start(cmd) => cmd.build(opts).await, + SubCommand::Start(cmd) => { + info!("Building datanode with {:#?}", cmd); + builder.build().await + } } } } @@ -263,74 +260,6 @@ impl StartCommand { Ok(()) } - - async fn build(&self, opts: DatanodeOptions) -> Result { - common_runtime::init_global_runtimes(&opts.runtime); - - let guard = common_telemetry::init_global_logging( - APP_NAME, - &opts.component.logging, - &opts.component.tracing, - opts.component.node_id.map(|x| x.to_string()), - ); - log_versions(version(), short_version(), APP_NAME); - - info!("Datanode start command: {:#?}", self); - info!("Datanode options: {:#?}", opts); - - let plugin_opts = opts.plugins; - let mut opts = opts.component; - opts.grpc.detect_server_addr(); - let mut plugins = Plugins::new(); - plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &opts) - .await - .context(StartDatanodeSnafu)?; - - let member_id = opts - .node_id - .context(MissingConfigSnafu { msg: "'node_id'" })?; - - let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu { - msg: "'meta_client_options'", - })?; - - let meta_client = meta_client::create_meta_client( - MetaClientType::Datanode { member_id }, - meta_config, - None, - ) - .await - .context(MetaClientInitSnafu)?; - - let meta_backend = Arc::new(MetaKvBackend { - client: meta_client.clone(), - }); - - // Builds cache registry for datanode. - let layered_cache_registry = Arc::new( - LayeredCacheRegistryBuilder::default() - .add_cache_registry(build_datanode_cache_registry(meta_backend.clone())) - .build(), - ); - - let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed) - .with_meta_client(meta_client) - .with_kv_backend(meta_backend) - .with_cache_registry(layered_cache_registry) - .build() - .await - .context(StartDatanodeSnafu)?; - - let services = DatanodeServiceBuilder::new(&opts) - .with_default_grpc_server(&datanode.region_server()) - .enable_http_service() - .build() - .await - .context(StartDatanodeSnafu)?; - datanode.setup_services(services); - - Ok(Instance::new(datanode, guard)) - } } #[cfg(test)] diff --git a/src/cmd/src/datanode/builder.rs b/src/cmd/src/datanode/builder.rs new file mode 100644 index 0000000000..40180563a2 --- /dev/null +++ b/src/cmd/src/datanode/builder.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cache::build_datanode_cache_registry; +use catalog::kvbackend::MetaKvBackend; +use common_base::Plugins; +use common_meta::cache::LayeredCacheRegistryBuilder; +use common_telemetry::info; +use common_version::{short_version, version}; +use datanode::datanode::DatanodeBuilder; +use datanode::service::DatanodeServiceBuilder; +use meta_client::MetaClientType; +use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; + +use crate::datanode::{DatanodeOptions, Instance, APP_NAME}; +use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu}; +use crate::log_versions; + +/// Builder for Datanode instance. +pub struct InstanceBuilder { + guard: Vec, + opts: DatanodeOptions, + datanode_builder: DatanodeBuilder, +} + +impl InstanceBuilder { + /// Try to create a new [InstanceBuilder], and do some initialization work like allocating + /// runtime resources, setting up global logging and plugins, etc. + pub async fn try_new_with_init( + mut opts: DatanodeOptions, + mut plugins: Plugins, + ) -> Result { + let guard = Self::init(&mut opts, &mut plugins).await?; + + let datanode_builder = Self::datanode_builder(&opts, plugins).await?; + + Ok(Self { + guard, + opts, + datanode_builder, + }) + } + + async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result> { + common_runtime::init_global_runtimes(&opts.runtime); + + let dn_opts = &mut opts.component; + let guard = common_telemetry::init_global_logging( + APP_NAME, + &dn_opts.logging, + &dn_opts.tracing, + dn_opts.node_id.map(|x| x.to_string()), + ); + + log_versions(version(), short_version(), APP_NAME); + + plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts) + .await + .context(StartDatanodeSnafu)?; + + dn_opts.grpc.detect_server_addr(); + + info!("Initialized Datanode instance with {:#?}", opts); + Ok(guard) + } + + async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result { + let dn_opts = &opts.component; + + let member_id = dn_opts + .node_id + .context(MissingConfigSnafu { msg: "'node_id'" })?; + let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu { + msg: "meta client options", + })?; + let client = meta_client::create_meta_client( + MetaClientType::Datanode { member_id }, + meta_client_options, + Some(&plugins), + ) + .await + .context(MetaClientInitSnafu)?; + + let backend = Arc::new(MetaKvBackend { + client: client.clone(), + }); + let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone()); + + let registry = Arc::new( + LayeredCacheRegistryBuilder::default() + .add_cache_registry(build_datanode_cache_registry(backend)) + .build(), + ); + builder + .with_cache_registry(registry) + .with_meta_client(client.clone()); + Ok(builder) + } + + /// Get the mutable builder for Datanode, in case you want to change some fields before the + /// final construction. + pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder { + &mut self.datanode_builder + } + + /// Try to build the Datanode instance. + pub async fn build(self) -> Result { + let mut datanode = self + .datanode_builder + .build() + .await + .context(StartDatanodeSnafu)?; + + let services = DatanodeServiceBuilder::new(&self.opts.component) + .with_default_grpc_server(&datanode.region_server()) + .enable_http_service() + .build() + .await + .context(StartDatanodeSnafu)?; + datanode.setup_services(services); + + Ok(Instance::new(datanode, self.guard)) + } +} diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index a671290503..ed9dda22d3 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -177,9 +177,6 @@ pub enum Error { source: meta_srv::error::Error, }, - #[snafu(display("Invalid REPL command: {reason}"))] - InvalidReplCommand { reason: String }, - #[snafu(display("Failed to parse SQL: {}", sql))] ParseSql { sql: String, @@ -331,7 +328,6 @@ impl ErrorExt for Error { Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } | Error::IllegalConfig { .. } - | Error::InvalidReplCommand { .. } | Error::InitTimezone { .. } | Error::ConnectEtcd { .. } | Error::CreateDir { .. } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 5877329698..492d399f61 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -75,7 +75,6 @@ use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::grpc::GrpcOptions; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; -use servers::Mode; use snafu::ResultExt; use tokio::sync::RwLock; use tracing_appender::non_blocking::WorkerGuard; @@ -497,12 +496,9 @@ impl StartCommand { .build(), ); - let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone) - .with_kv_backend(kv_backend.clone()) - .with_cache_registry(layered_cache_registry.clone()) - .build() - .await - .context(error::StartDatanodeSnafu)?; + let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone()); + builder.with_cache_registry(layered_cache_registry.clone()); + let datanode = builder.build().await.context(error::StartDatanodeSnafu)?; let information_extension = Arc::new(StandaloneInformationExtension::new( datanode.region_server(), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index b0a02eb46e..1bb06f1cf9 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -43,6 +43,7 @@ use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; +use query::dummy_catalog::TableProviderFactoryRef; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; use servers::server::ServerHandlers; @@ -58,8 +59,8 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use crate::error::{ self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, - MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, - ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu, + MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, + ShutdownServerSnafu, StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -157,50 +158,45 @@ impl Datanode { pub struct DatanodeBuilder { opts: DatanodeOptions, - mode: Mode, + table_provider_factory: Option, plugins: Plugins, meta_client: Option, - kv_backend: Option, + kv_backend: KvBackendRef, cache_registry: Option>, } impl DatanodeBuilder { - /// `kv_backend` is optional. If absent, the builder will try to build one - /// by using the given `opts` - pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self { + pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self { Self { opts, - mode, + table_provider_factory: None, plugins, meta_client: None, - kv_backend: None, + kv_backend, cache_registry: None, } } - pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self { - Self { - meta_client: Some(meta_client), - ..self - } + pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self { + self.meta_client = Some(client); + self } - pub fn with_cache_registry(self, cache_registry: Arc) -> Self { - Self { - cache_registry: Some(cache_registry), - ..self - } + pub fn with_cache_registry(&mut self, registry: Arc) -> &mut Self { + self.cache_registry = Some(registry); + self } - pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self { - Self { - kv_backend: Some(kv_backend), - ..self - } + pub fn kv_backend(&self) -> &KvBackendRef { + &self.kv_backend + } + + pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self { + self.table_provider_factory = Some(factory); + self } pub async fn build(mut self) -> Result { - let mode = &self.mode; let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; let meta_client = self.meta_client.take(); @@ -210,8 +206,6 @@ impl DatanodeBuilder { // writable upon open. let controlled_by_metasrv = meta_client.is_some(); - let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?; - // build and initialize region server let (region_event_listener, region_event_receiver) = if controlled_by_metasrv { let (tx, rx) = new_region_server_event_channel(); @@ -233,7 +227,7 @@ impl DatanodeBuilder { .new_region_server(schema_metadata_manager, region_event_listener) .await?; - let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone()); let table_values = datanode_table_manager .tables(node_id) .try_collect::>() @@ -273,9 +267,14 @@ impl DatanodeBuilder { None }; + let mode = if heartbeat_task.is_none() { + Mode::Standalone + } else { + Mode::Distributed + }; let greptimedb_telemetry_task = get_greptimedb_telemetry_task( Some(self.opts.storage.data_home.clone()), - mode, + &mode, self.opts.enable_telemetry, ) .await; @@ -363,7 +362,11 @@ impl DatanodeBuilder { ); let query_engine = query_engine_factory.query_engine(); - let table_provider_factory = Arc::new(DummyTableProviderFactory); + let table_provider_factory = self + .table_provider_factory + .clone() + .unwrap_or_else(|| Arc::new(DummyTableProviderFactory)); + let mut region_server = RegionServer::with_table_provider( query_engine, common_runtime::global_runtime(), @@ -635,7 +638,6 @@ mod tests { use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use mito2::engine::MITO_ENGINE_NAME; - use servers::Mode; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; @@ -671,19 +673,19 @@ mod tests { let kv_backend = Arc::new(MemoryKvBackend::new()); let layered_cache_registry = Arc::new( LayeredCacheRegistryBuilder::default() - .add_cache_registry(build_datanode_cache_registry(kv_backend)) + .add_cache_registry(build_datanode_cache_registry(kv_backend.clone())) .build(), ); - let builder = DatanodeBuilder::new( + let mut builder = DatanodeBuilder::new( DatanodeOptions { node_id: Some(0), ..Default::default() }, Plugins::default(), - Mode::Standalone, - ) - .with_cache_registry(layered_cache_registry); + kv_backend, + ); + builder.with_cache_registry(layered_cache_registry); let kv = Arc::new(MemoryKvBackend::default()) as _; setup_table_datanode(&kv).await; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 84e8168d91..5c81f6ab46 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -150,12 +150,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Expect KvBackend but not found"))] - MissingKvBackend { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid SQL, error: {}", msg))] InvalidSql { msg: String }, @@ -426,7 +420,6 @@ impl ErrorExt for Error { | MissingRequiredField { .. } | RegionEngineNotFound { .. } | ParseAddr { .. } - | MissingKvBackend { .. } | TomlFormat { .. } => StatusCode::InvalidArguments, PayloadNotExist { .. } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 0b18a71d3a..adb047c823 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -59,7 +59,6 @@ use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; use servers::heartbeat_options::HeartbeatOptions; use servers::server::ServerHandlers; -use servers::Mode; use tempfile::TempDir; use tonic::codec::CompressionEncoding; use tonic::transport::Server; @@ -333,13 +332,11 @@ impl GreptimeDbClusterBuilder { .build(), ); - let mut datanode = DatanodeBuilder::new(opts, Plugins::default(), Mode::Distributed) - .with_kv_backend(meta_backend) + let mut builder = DatanodeBuilder::new(opts, Plugins::default(), meta_backend); + builder .with_cache_registry(layered_cache_registry) - .with_meta_client(meta_client) - .build() - .await - .unwrap(); + .with_meta_client(meta_client); + let mut datanode = builder.build().await.unwrap(); datanode.start_heartbeat().await.unwrap(); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index e25e2a9682..e8224688ba 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -49,7 +49,6 @@ use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use query::stats::StatementStatistics; use servers::grpc::GrpcOptions; use servers::server::ServerHandlers; -use servers::Mode; use snafu::ResultExt; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -144,13 +143,10 @@ impl GreptimeDbStandaloneBuilder { .build(), ); - let datanode = - DatanodeBuilder::new(opts.datanode_options(), plugins.clone(), Mode::Standalone) - .with_kv_backend(kv_backend.clone()) - .with_cache_registry(layered_cache_registry) - .build() - .await - .unwrap(); + let mut builder = + DatanodeBuilder::new(opts.datanode_options(), plugins.clone(), kv_backend.clone()); + builder.with_cache_registry(layered_cache_registry); + let datanode = builder.build().await.unwrap(); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap();