diff --git a/Cargo.lock b/Cargo.lock index e2b79e0835..37fd88cb46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,7 +1624,6 @@ dependencies = [ "common-time", "common-version", "common-wal", - "config", "datanode", "datatypes", "either", @@ -1728,9 +1727,22 @@ name = "common-config" version = "0.7.2" dependencies = [ "common-base", + "common-error", + "common-macro", + "common-telemetry", + "common-test-util", + "common-wal", + "config", + "datanode", + "meta-client", "num_cpus", "serde", + "serde_json", + "snafu 0.8.2", "sysinfo", + "temp-env", + "tempfile", + "toml 0.8.12", ] [[package]] @@ -3149,6 +3161,7 @@ dependencies = [ "catalog", "client", "common-base", + "common-config", "common-error", "common-function", "common-greptimedb-telemetry", @@ -5601,6 +5614,7 @@ dependencies = [ "client", "common-base", "common-catalog", + "common-config", "common-error", "common-greptimedb-telemetry", "common-grpc", diff --git a/Cargo.toml b/Cargo.toml index 32fb4f2ee3..1ece9e77fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ bytemuck = "1.12" bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.4", features = ["derive"] } +config = "0.13.0" crossbeam-utils = "0.8" dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index fbef84fdca..7052df9224 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -40,7 +40,6 @@ common-telemetry = { workspace = true, features = [ common-time.workspace = true common-version.workspace = true common-wal.workspace = true -config = "0.13" datanode.workspace = true datatypes.workspace = true either = "1.8" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 62400fd6a9..c21e034999 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -18,6 +18,7 @@ use std::time::Duration; use async_trait::async_trait; use catalog::kvbackend::MetaKvBackend; use clap::Parser; +use common_config::Configurable; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_wal::config::DatanodeWalConfig; @@ -28,7 +29,9 @@ use meta_client::MetaClientOptions; use servers::Mode; use snafu::{OptionExt, ResultExt}; -use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu}; +use crate::error::{ + LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, +}; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -133,12 +136,24 @@ struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - let mut opts: DatanodeOptions = Options::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - DatanodeOptions::env_list_keys(), - )?; + Ok(Options::Datanode(Box::new( + self.merge_with_cli_options( + global_options, + DatanodeOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), + ) + .context(LoadLayeredConfigSnafu)?, + )?, + ))) + } + // The precedence order is: cli > config file > environment variables > default values. + fn merge_with_cli_options( + &self, + global_options: &GlobalOptions, + mut opts: DatanodeOptions, + ) -> Result { if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -208,7 +223,7 @@ impl StartCommand { // Disable dashboard in datanode. opts.http.disable_dashboard = true; - Ok(Options::Datanode(Box::new(opts))) + Ok(opts) } async fn build(self, mut opts: DatanodeOptions) -> Result { @@ -259,13 +274,14 @@ mod tests { use std::io::Write; use std::time::Duration; + use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config}; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; use super::*; - use crate::options::{GlobalOptions, ENV_VAR_SEP}; + use crate::options::GlobalOptions; #[test] fn test_read_from_config_file() { diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 8a3e23eaa3..0e1fec26df 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -17,7 +17,6 @@ use std::any::Any; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; -use config::ConfigError; use rustyline::error::ReadlineError; use snafu::{Location, Snafu}; @@ -209,8 +208,8 @@ pub enum Error { #[snafu(display("Failed to load layered config"))] LoadLayeredConfig { - #[snafu(source)] - error: ConfigError, + #[snafu(source(from(common_config::error::Error, Box::new)))] + source: Box, #[snafu(implicit)] location: Location, }, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 7cc2a2acca..4297553304 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -23,6 +23,7 @@ use cache::{ use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::DatanodeClients; +use common_config::Configurable; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; @@ -40,7 +41,9 @@ use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::{OptionExt, ResultExt}; -use crate::error::{self, InitTimezoneSnafu, MissingConfigSnafu, Result, StartFrontendSnafu}; +use crate::error::{ + self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu, +}; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -153,12 +156,24 @@ pub struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - let mut opts: FrontendOptions = Options::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - FrontendOptions::env_list_keys(), - )?; + Ok(Options::Frontend(Box::new( + self.merge_with_cli_options( + global_options, + FrontendOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), + ) + .context(LoadLayeredConfigSnafu)?, + )?, + ))) + } + // The precedence order is: cli > config file > environment variables > default values. + fn merge_with_cli_options( + &self, + global_options: &GlobalOptions, + mut opts: FrontendOptions, + ) -> Result { if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -220,7 +235,7 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - Ok(Options::Frontend(Box::new(opts))) + Ok(opts) } async fn build(self, mut opts: FrontendOptions) -> Result { @@ -336,12 +351,13 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; + use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use frontend::service_config::GrpcOptions; use servers::http::HttpOptions; use super::*; - use crate::options::{GlobalOptions, ENV_VAR_SEP}; + use crate::options::GlobalOptions; #[test] fn test_try_from_start_command() { diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index fbc605acaf..f6e7a0c3c4 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -16,13 +16,14 @@ use std::time::Duration; use async_trait::async_trait; use clap::Parser; +use common_config::Configurable; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use meta_srv::bootstrap::MetasrvInstance; use meta_srv::metasrv::MetasrvOptions; use snafu::ResultExt; -use crate::error::{self, Result, StartMetaServerSnafu}; +use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu}; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -128,12 +129,24 @@ struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - let mut opts: MetasrvOptions = Options::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - MetasrvOptions::env_list_keys(), - )?; + Ok(Options::Metasrv(Box::new( + self.merge_with_cli_options( + global_options, + MetasrvOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), + ) + .context(LoadLayeredConfigSnafu)?, + )?, + ))) + } + // The precedence order is: cli > config file > environment variables > default values. + fn merge_with_cli_options( + &self, + global_options: &GlobalOptions, + mut opts: MetasrvOptions, + ) -> Result { if let Some(dir) = &global_options.log_dir { opts.logging.dir.clone_from(dir); } @@ -196,7 +209,7 @@ impl StartCommand { // Disable dashboard in metasrv. opts.http.disable_dashboard = true; - Ok(Options::Metasrv(Box::new(opts))) + Ok(opts) } async fn build(self, mut opts: MetasrvOptions) -> Result { @@ -225,11 +238,11 @@ mod tests { use std::io::Write; use common_base::readable_size::ReadableSize; + use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use meta_srv::selector::SelectorType; use super::*; - use crate::options::ENV_VAR_SEP; #[test] fn test_read_from_cmd() { diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 8bbaba4aa1..98b2fd0d2a 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -14,19 +14,12 @@ use clap::Parser; use common_telemetry::logging::{LoggingOptions, TracingOptions}; -use config::{Config, Environment, File, FileFormat}; use datanode::config::DatanodeOptions; use frontend::frontend::FrontendOptions; use meta_srv::metasrv::MetasrvOptions; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu}; use crate::standalone::StandaloneOptions; -pub const ENV_VAR_SEP: &str = "__"; -pub const ENV_LIST_SEP: &str = ","; - pub enum Options { Datanode(Box), Frontend(Box), @@ -71,65 +64,6 @@ impl Options { } } - /// Load the configuration from multiple sources and merge them. - /// The precedence order is: config file > environment variables > default values. - /// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx". - /// The function will use dunder(double underscore) `__` as the separator for environment variables, for example: - /// `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN` will be mapped to `DatanodeOptions.storage.manifest.checkpoint_margin` field in the configuration. - /// `list_keys` is the list of keys that should be parsed as a list, for example, you can pass `Some(&["meta_client_options.metasrv_addrs"]` to parse `GREPTIMEDB_METASRV__META_CLIENT_OPTIONS__METASRV_ADDRS` as a list. - /// The function will use comma `,` as the separator for list values, for example: `127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003`. - pub fn load_layered_options<'de, T: Serialize + Deserialize<'de> + Default>( - config_file: Option<&str>, - env_prefix: &str, - list_keys: Option<&[&str]>, - ) -> Result { - let default_opts = T::default(); - - let env_source = { - let mut env = Environment::default(); - - if !env_prefix.is_empty() { - env = env.prefix(env_prefix); - } - - if let Some(list_keys) = list_keys { - env = env.list_separator(ENV_LIST_SEP); - for key in list_keys { - env = env.with_list_parse_key(key); - } - } - - env.try_parsing(true) - .separator(ENV_VAR_SEP) - .ignore_empty(true) - }; - - // Workaround: Replacement for `Config::try_from(&default_opts)` due to - // `ConfigSerializer` cannot handle the case of an empty struct contained - // within an iterative structure. - // See: https://github.com/mehcode/config-rs/issues/461 - let json_str = serde_json::to_string(&default_opts).context(SerdeJsonSnafu)?; - let default_config = File::from_str(&json_str, FileFormat::Json); - - // Add default values and environment variables as the sources of the configuration. - let mut layered_config = Config::builder() - .add_source(default_config) - .add_source(env_source); - - // Add config file as the source of the configuration if it is specified. - if let Some(config_file) = config_file { - layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml)); - } - - let opts = layered_config - .build() - .context(LoadLayeredConfigSnafu)? - .try_deserialize() - .context(LoadLayeredConfigSnafu)?; - - Ok(opts) - } - pub fn node_id(&self) -> Option { match self { Options::Metasrv(_) | Options::Cli(_) | Options::Standalone(_) => None, @@ -138,127 +72,3 @@ impl Options { } } } - -#[cfg(test)] -mod tests { - use std::io::Write; - - use common_test_util::temp_dir::create_named_temp_file; - use common_wal::config::DatanodeWalConfig; - use datanode::config::{DatanodeOptions, ObjectStoreConfig}; - - use super::*; - - #[test] - fn test_load_layered_options() { - let mut file = create_named_temp_file(); - let toml_str = r#" - mode = "distributed" - enable_memory_catalog = false - rpc_addr = "127.0.0.1:3001" - rpc_hostname = "127.0.0.1" - rpc_runtime_size = 8 - mysql_addr = "127.0.0.1:4406" - mysql_runtime_size = 2 - - [meta_client] - timeout = "3s" - connect_timeout = "5s" - tcp_nodelay = true - - [wal] - provider = "raft_engine" - dir = "/tmp/greptimedb/wal" - file_size = "1GB" - purge_threshold = "50GB" - purge_interval = "10m" - read_batch_size = 128 - sync_write = false - - [logging] - level = "debug" - dir = "/tmp/greptimedb/test/logs" - "#; - write!(file, "{}", toml_str).unwrap(); - - let env_prefix = "DATANODE_UT"; - temp_env::with_vars( - // The following environment variables will be used to override the values in the config file. - [ - ( - // storage.type = S3 - [ - env_prefix.to_string(), - "storage".to_uppercase(), - "type".to_uppercase(), - ] - .join(ENV_VAR_SEP), - Some("S3"), - ), - ( - // storage.bucket = mybucket - [ - env_prefix.to_string(), - "storage".to_uppercase(), - "bucket".to_uppercase(), - ] - .join(ENV_VAR_SEP), - Some("mybucket"), - ), - ( - // wal.dir = /other/wal/dir - [ - env_prefix.to_string(), - "wal".to_uppercase(), - "dir".to_uppercase(), - ] - .join(ENV_VAR_SEP), - Some("/other/wal/dir"), - ), - ( - // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003 - [ - env_prefix.to_string(), - "meta_client".to_uppercase(), - "metasrv_addrs".to_uppercase(), - ] - .join(ENV_VAR_SEP), - Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"), - ), - ], - || { - let opts: DatanodeOptions = Options::load_layered_options( - Some(file.path().to_str().unwrap()), - env_prefix, - DatanodeOptions::env_list_keys(), - ) - .unwrap(); - - // Check the configs from environment variables. - match &opts.storage.store { - ObjectStoreConfig::S3(s3_config) => { - assert_eq!(s3_config.bucket, "mybucket".to_string()); - } - _ => panic!("unexpected store type"), - } - assert_eq!( - opts.meta_client.unwrap().metasrv_addrs, - vec![ - "127.0.0.1:3001".to_string(), - "127.0.0.1:3002".to_string(), - "127.0.0.1:3003".to_string() - ] - ); - - // Should be the values from config file, not environment variables. - let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else { - unreachable!() - }; - assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal"); - - // Should be default values. - assert_eq!(opts.node_id, None); - }, - ); - } -} diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 22220e553e..ff810ead5d 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -23,7 +23,7 @@ use cache::{ use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; -use common_config::{metadata_store_dir, KvBackendConfig}; +use common_config::{metadata_store_dir, Configurable, KvBackendConfig}; use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; @@ -45,7 +45,6 @@ use common_wal::config::StandaloneWalConfig; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; -use frontend::error::{Result as FeResult, TomlFormatSnafu}; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; @@ -64,9 +63,9 @@ use snafu::{OptionExt, ResultExt}; use crate::error::{ BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, - InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, Result, ShutdownDatanodeSnafu, - ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, - StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, + InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, + ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -131,12 +130,6 @@ pub struct StandaloneOptions { pub tracing: TracingOptions, } -impl StandaloneOptions { - pub fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["wal.broker_endpoints"]) - } -} - impl Default for StandaloneOptions { fn default() -> Self { Self { @@ -166,6 +159,12 @@ impl Default for StandaloneOptions { } } +impl Configurable<'_> for StandaloneOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { + Some(&["wal.broker_endpoints"]) + } +} + impl StandaloneOptions { pub fn frontend_options(&self) -> FrontendOptions { let cloned_opts = self.clone(); @@ -200,10 +199,6 @@ impl StandaloneOptions { ..Default::default() } } - - pub fn to_toml(&self) -> FeResult { - toml::to_string(self).context(TomlFormatSnafu) - } } pub struct Instance { @@ -292,12 +287,25 @@ pub struct StartCommand { impl StartCommand { fn load_options(&self, global_options: &GlobalOptions) -> Result { - let mut opts: StandaloneOptions = Options::load_layered_options( - self.config_file.as_deref(), - self.env_prefix.as_ref(), - StandaloneOptions::env_list_keys(), - )?; + Ok(Options::Standalone(Box::new( + self.merge_with_cli_options( + global_options, + StandaloneOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), + ) + .context(LoadLayeredConfigSnafu)?, + )?, + ))) + } + // The precedence order is: cli > config file > environment variables > default values. + fn merge_with_cli_options( + &self, + global_options: &GlobalOptions, + mut opts: StandaloneOptions, + ) -> Result { + // Should always be standalone mode. opts.mode = Mode::Standalone; if let Some(dir) = &global_options.log_dir { @@ -358,7 +366,7 @@ impl StartCommand { opts.user_provider.clone_from(&self.user_provider); - Ok(Options::Standalone(Box::new(opts))) + Ok(opts) } #[allow(unreachable_code)] @@ -541,13 +549,14 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; + use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use common_wal::config::DatanodeWalConfig; use datanode::config::{FileConfig, GcsConfig}; use servers::Mode; use super::*; - use crate::options::{GlobalOptions, ENV_VAR_SEP}; + use crate::options::GlobalOptions; #[tokio::test] async fn test_try_from_start_command_to_anymap() { @@ -789,8 +798,8 @@ mod tests { #[test] fn test_load_default_standalone_options() { - let options: StandaloneOptions = - Options::load_layered_options(None, "GREPTIMEDB_FRONTEND", None).unwrap(); + let options = + StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap(); let default_options = StandaloneOptions::default(); assert_eq!(options.mode, default_options.mode); assert_eq!(options.enable_telemetry, default_options.enable_telemetry); diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index e85f220c65..ab9149e60a 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -9,6 +9,22 @@ workspace = true [dependencies] common-base.workspace = true +common-error.workspace = true +common-macro.workspace = true +config.workspace = true num_cpus.workspace = true serde.workspace = true +serde_json.workspace = true +snafu.workspace = true sysinfo.workspace = true +toml.workspace = true + +[dev-dependencies] +common-telemetry.workspace = true +common-test-util.workspace = true +common-wal.workspace = true +datanode.workspace = true +meta-client.workspace = true +serde.workspace = true +temp-env = "0.3" +tempfile.workspace = true diff --git a/src/common/config/src/config.rs b/src/common/config/src/config.rs new file mode 100644 index 0000000000..c21735a059 --- /dev/null +++ b/src/common/config/src/config.rs @@ -0,0 +1,248 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use config::{Environment, File, FileFormat}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu, TomlFormatSnafu}; + +/// Separator for environment variables. For example, `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN`. +pub const ENV_VAR_SEP: &str = "__"; + +/// Separator for list values in environment variables. For example, `localhost:3001,localhost:3002,localhost:3003`. +pub const ENV_LIST_SEP: &str = ","; + +/// Configuration trait defines the common interface for configuration that can be loaded from multiple sources and serialized to TOML. +pub trait Configurable<'de>: Serialize + Deserialize<'de> + Default + Sized { + /// Load the configuration from multiple sources and merge them. + /// The precedence order is: config file > environment variables > default values. + /// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx". + /// The function will use dunder(double underscore) `__` as the separator for environment variables, for example: + /// `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN` will be mapped to `DatanodeOptions.storage.manifest.checkpoint_margin` field in the configuration. + /// `list_keys` is the list of keys that should be parsed as a list, for example, you can pass `Some(&["meta_client_options.metasrv_addrs"]` to parse `GREPTIMEDB_METASRV__META_CLIENT_OPTIONS__METASRV_ADDRS` as a list. + /// The function will use comma `,` as the separator for list values, for example: `127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003`. + fn load_layered_options(config_file: Option<&str>, env_prefix: &str) -> Result { + let default_opts = Self::default(); + + let env_source = { + let mut env = Environment::default(); + + if !env_prefix.is_empty() { + env = env.prefix(env_prefix); + } + + if let Some(list_keys) = Self::env_list_keys() { + env = env.list_separator(ENV_LIST_SEP); + for key in list_keys { + env = env.with_list_parse_key(key); + } + } + + env.try_parsing(true) + .separator(ENV_VAR_SEP) + .ignore_empty(true) + }; + + // Workaround: Replacement for `Config::try_from(&default_opts)` due to + // `ConfigSerializer` cannot handle the case of an empty struct contained + // within an iterative structure. + // See: https://github.com/mehcode/config-rs/issues/461 + let json_str = serde_json::to_string(&default_opts).context(SerdeJsonSnafu)?; + let default_config = File::from_str(&json_str, FileFormat::Json); + + // Add default values and environment variables as the sources of the configuration. + let mut layered_config = config::Config::builder() + .add_source(default_config) + .add_source(env_source); + + // Add config file as the source of the configuration if it is specified. + if let Some(config_file) = config_file { + layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml)); + } + + let opts = layered_config + .build() + .and_then(|x| x.try_deserialize()) + .context(LoadLayeredConfigSnafu)?; + + Ok(opts) + } + + /// List of toml keys that should be parsed as a list. + fn env_list_keys() -> Option<&'static [&'static str]> { + None + } + + /// Serialize the configuration to a TOML string. + fn to_toml(&self) -> Result { + toml::to_string(&self).context(TomlFormatSnafu) + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use common_telemetry::logging::LoggingOptions; + use common_test_util::temp_dir::create_named_temp_file; + use common_wal::config::DatanodeWalConfig; + use datanode::config::{ObjectStoreConfig, StorageConfig}; + use meta_client::MetaClientOptions; + use serde::{Deserialize, Serialize}; + + use super::*; + use crate::Mode; + + #[derive(Debug, Serialize, Deserialize)] + struct TestDatanodeConfig { + mode: Mode, + node_id: Option, + logging: LoggingOptions, + meta_client: Option, + wal: DatanodeWalConfig, + storage: StorageConfig, + } + + impl Default for TestDatanodeConfig { + fn default() -> Self { + Self { + mode: Mode::Distributed, + node_id: None, + logging: LoggingOptions::default(), + meta_client: None, + wal: DatanodeWalConfig::default(), + storage: StorageConfig::default(), + } + } + } + + impl Configurable<'_> for TestDatanodeConfig { + fn env_list_keys() -> Option<&'static [&'static str]> { + Some(&["meta_client.metasrv_addrs"]) + } + } + + #[test] + fn test_load_layered_options() { + let mut file = create_named_temp_file(); + let toml_str = r#" + mode = "distributed" + enable_memory_catalog = false + rpc_addr = "127.0.0.1:3001" + rpc_hostname = "127.0.0.1" + rpc_runtime_size = 8 + mysql_addr = "127.0.0.1:4406" + mysql_runtime_size = 2 + + [meta_client] + timeout = "3s" + connect_timeout = "5s" + tcp_nodelay = true + + [wal] + provider = "raft_engine" + dir = "/tmp/greptimedb/wal" + file_size = "1GB" + purge_threshold = "50GB" + purge_interval = "10m" + read_batch_size = 128 + sync_write = false + + [logging] + level = "debug" + dir = "/tmp/greptimedb/test/logs" + "#; + write!(file, "{}", toml_str).unwrap(); + + let env_prefix = "DATANODE_UT"; + temp_env::with_vars( + // The following environment variables will be used to override the values in the config file. + [ + ( + // storage.type = S3 + [ + env_prefix.to_string(), + "storage".to_uppercase(), + "type".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("S3"), + ), + ( + // storage.bucket = mybucket + [ + env_prefix.to_string(), + "storage".to_uppercase(), + "bucket".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("mybucket"), + ), + ( + // wal.dir = /other/wal/dir + [ + env_prefix.to_string(), + "wal".to_uppercase(), + "dir".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("/other/wal/dir"), + ), + ( + // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003 + [ + env_prefix.to_string(), + "meta_client".to_uppercase(), + "metasrv_addrs".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"), + ), + ], + || { + let opts = TestDatanodeConfig::load_layered_options( + Some(file.path().to_str().unwrap()), + env_prefix, + ) + .unwrap(); + + // Check the configs from environment variables. + match &opts.storage.store { + ObjectStoreConfig::S3(s3_config) => { + assert_eq!(s3_config.bucket, "mybucket".to_string()); + } + _ => panic!("unexpected store type"), + } + assert_eq!( + opts.meta_client.unwrap().metasrv_addrs, + vec![ + "127.0.0.1:3001".to_string(), + "127.0.0.1:3002".to_string(), + "127.0.0.1:3003".to_string() + ] + ); + + // Should be the values from config file, not environment variables. + let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else { + unreachable!() + }; + assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal"); + + // Should be default values. + assert_eq!(opts.node_id, None); + }, + ); + } +} diff --git a/src/common/config/src/error.rs b/src/common/config/src/error.rs new file mode 100644 index 0000000000..fbce83fd00 --- /dev/null +++ b/src/common/config/src/error.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use config::ConfigError; +use snafu::{Location, Snafu}; + +pub type Result = std::result::Result; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Failed to load layered config"))] + LoadLayeredConfig { + #[snafu(source)] + error: ConfigError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serde json"))] + SerdeJson { + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serialize options to TOML"))] + TomlFormat { + #[snafu(source)] + error: toml::ser::Error, + #[snafu(implicit)] + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::TomlFormat { .. } | Error::LoadLayeredConfig { .. } => { + StatusCode::InvalidArguments + } + Error::SerdeJson { .. } => StatusCode::Unexpected, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 672edaee51..d6fb9abb45 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod config; +pub mod error; pub mod utils; use common_base::readable_size::ReadableSize; +pub use config::*; use serde::{Deserialize, Serialize}; pub fn metadata_store_dir(store_dir: &str) -> String { diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 6a34918e24..26a7ccb675 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -18,6 +18,7 @@ bytes.workspace = true catalog.workspace = true client.workspace = true common-base.workspace = true +common-config.workspace = true common-error.workspace = true common-function.workspace = true common-greptimedb-telemetry.workspace = true diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 8ced4825bd..ec278d3c42 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -16,6 +16,7 @@ use common_base::readable_size::ReadableSize; use common_base::secrets::SecretString; +use common_config::Configurable; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }; @@ -266,14 +267,10 @@ impl Default for DatanodeOptions { } } -impl DatanodeOptions { - pub fn env_list_keys() -> Option<&'static [&'static str]> { +impl Configurable<'_> for DatanodeOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"]) } - - pub fn to_toml_string(&self) -> String { - toml::to_string(&self).unwrap() - } } #[allow(clippy::large_enum_variant)] diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 17233a41a4..ec3bccceeb 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -359,6 +359,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to serialize options to TOML"))] + TomlFormat { + #[snafu(implicit)] + location: Location, + #[snafu(source(from(common_config::error::Error, Box::new)))] + source: Box, + }, } pub type Result = std::result::Result; @@ -391,7 +399,8 @@ impl ErrorExt for Error { | MissingNodeId { .. } | ColumnNoneDefaultValue { .. } | MissingWalDirConfig { .. } - | MissingKvBackend { .. } => StatusCode::InvalidArguments, + | MissingKvBackend { .. } + | TomlFormat { .. } => StatusCode::InvalidArguments, PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => { StatusCode::Unexpected diff --git a/src/datanode/src/service.rs b/src/datanode/src/service.rs index 1ec2bd4eaa..dccacf6813 100644 --- a/src/datanode/src/service.rs +++ b/src/datanode/src/service.rs @@ -15,6 +15,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use common_config::Configurable; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; @@ -23,7 +24,7 @@ use servers::server::{ServerHandler, ServerHandlers}; use snafu::ResultExt; use crate::config::DatanodeOptions; -use crate::error::{ParseAddrSnafu, Result}; +use crate::error::{ParseAddrSnafu, Result, TomlFormatSnafu}; use crate::region_server::RegionServer; pub struct DatanodeServiceBuilder<'a> { @@ -75,7 +76,7 @@ impl<'a> DatanodeServiceBuilder<'a> { if self.enable_http_service { let http_server = HttpServerBuilder::new(self.opts.http.clone()) .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(self.opts.to_toml_string()) + .with_greptime_config_options(self.opts.to_toml().context(TomlFormatSnafu)?) .build(); let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu { addr: &self.opts.http.addr, diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 1a13aefb85..f7bd62081f 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -343,8 +343,10 @@ pub enum Error { #[snafu(display("Failed to serialize options to TOML"))] TomlFormat { - #[snafu(source)] - error: toml::ser::Error, + #[snafu(implicit)] + location: Location, + #[snafu(source(from(common_config::error::Error, Box::new)))] + source: Box, }, #[snafu(display("Failed to get cache from cache registry: {}", name))] diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 95d1acdc98..f0dfac1c7d 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_config::config::Configurable; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; @@ -19,9 +20,7 @@ use servers::export_metrics::ExportMetricsOption; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; use servers::Mode; -use snafu::prelude::*; -use crate::error::{Result, TomlFormatSnafu}; use crate::service_config::{ DatanodeOptions, GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions, PromStoreOptions, @@ -75,24 +74,10 @@ impl Default for FrontendOptions { } } -impl FrontendOptions { - pub fn env_list_keys() -> Option<&'static [&'static str]> { +impl Configurable<'_> for FrontendOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { Some(&["meta_client.metasrv_addrs"]) } - - pub fn to_toml_string(&self) -> String { - toml::to_string(&self).unwrap() - } -} - -pub trait TomlSerializable { - fn to_toml(&self) -> Result; -} - -impl TomlSerializable for FrontendOptions { - fn to_toml(&self) -> Result { - toml::to_string(&self).context(TomlFormatSnafu) - } } #[cfg(test)] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f342762bbb..c0ae6b64c2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -30,7 +30,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::CatalogManagerRef; use client::OutputData; use common_base::Plugins; -use common_config::KvBackendConfig; +use common_config::{Configurable, KvBackendConfig}; use common_error::ext::{BoxedError, ErrorExt}; use common_frontend::handler::FrontendInvoker; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -86,7 +86,7 @@ use crate::error::{ PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu, TableOperationSnafu, }; -use crate::frontend::{FrontendOptions, TomlSerializable}; +use crate::frontend::FrontendOptions; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; @@ -188,7 +188,7 @@ impl Instance { pub fn build_servers( &mut self, - opts: impl Into + TomlSerializable, + opts: impl Into + for<'de> Configurable<'de>, servers: ServerHandlers, ) -> Result<()> { let opts: FrontendOptions = opts.into(); diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index f7b5939279..2309b02734 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; +use common_config::Configurable; use common_runtime::Builder as RuntimeBuilder; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; @@ -31,14 +32,14 @@ use servers::server::{Server, ServerHandlers}; use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig}; use snafu::ResultExt; -use crate::error::{self, Result, StartServerSnafu}; -use crate::frontend::{FrontendOptions, TomlSerializable}; +use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu}; +use crate::frontend::FrontendOptions; use crate::instance::FrontendInstance; use crate::service_config::GrpcOptions; pub struct Services where - T: Into + TomlSerializable + Clone, + T: Into + for<'de> Configurable<'de> + Clone, U: FrontendInstance, { opts: T, @@ -50,7 +51,7 @@ where impl Services where - T: Into + TomlSerializable + Clone, + T: Into + for<'de> Configurable<'de> + Clone, U: FrontendInstance, { pub fn new(opts: T, instance: Arc, plugins: Plugins) -> Self { @@ -171,7 +172,7 @@ where let opts = self.opts.clone(); let instance = self.instance.clone(); - let toml = opts.to_toml()?; + let toml = opts.to_toml().context(TomlFormatSnafu)?; let opts: FrontendOptions = opts.into(); let handlers = ServerHandlers::default(); diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 28468247de..923b704a4a 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -16,6 +16,7 @@ async-trait = "0.1" client.workspace = true common-base.workspace = true common-catalog.workspace = true +common-config.workspace = true common-error.workspace = true common-greptimedb-telemetry.workspace = true common-grpc.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 02f89ca9b5..f5ca9174ea 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,6 +20,7 @@ use api::v1::meta::lock_server::LockServer; use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; +use common_config::Configurable; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -38,7 +39,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; -use crate::error::InitExportMetricsTaskSnafu; +use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu}; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; use crate::metasrv::builder::MetasrvBuilder; @@ -73,7 +74,7 @@ impl MetasrvInstance { let httpsrv = Arc::new( HttpServerBuilder::new(opts.http.clone()) .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(opts.to_toml_string()) + .with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?) .build(), ); // put metasrv into plugins for later use diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 0a53f0a667..e598a956d8 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -832,6 +832,14 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display("Failed to serialize options to TOML"))] + TomlFormat { + #[snafu(implicit)] + location: Location, + #[snafu(source(from(common_config::error::Error, Box::new)))] + source: Box, + }, } impl Error { @@ -903,7 +911,8 @@ impl ErrorExt for Error { | Error::InitExportMetricsTask { .. } | Error::InvalidHeartbeatRequest { .. } | Error::ProcedureNotFound { .. } - | Error::TooManyPartitions { .. } => StatusCode::InvalidArguments, + | Error::TooManyPartitions { .. } + | Error::TomlFormat { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::StatKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 68b18f2f27..f058d49b4a 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -20,6 +20,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_base::Plugins; +use common_config::Configurable; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::ProcedureExecutorRef; @@ -113,12 +114,6 @@ pub struct MetasrvOptions { pub tracing: TracingOptions, } -impl MetasrvOptions { - pub fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["wal.broker_endpoints"]) - } -} - impl Default for MetasrvOptions { fn default() -> Self { Self { @@ -153,9 +148,9 @@ impl Default for MetasrvOptions { } } -impl MetasrvOptions { - pub fn to_toml_string(&self) -> String { - toml::to_string(&self).unwrap() +impl Configurable<'_> for MetasrvOptions { + fn env_list_keys() -> Option<&'static [&'static str]> { + Some(&["wal.broker_endpoints"]) } } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 3f9b441541..548404ece0 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -22,6 +22,7 @@ use auth::UserProviderRef; use axum::Router; use catalog::kvbackend::KvBackendCatalogManager; use common_base::secrets::ExposeSecret; +use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_runtime::Builder as RuntimeBuilder; @@ -391,7 +392,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router None, ) .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(instance.opts.datanode_options().to_toml_string()) + .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build(); (http_server.build(http_server.make_app()), instance.guard) } @@ -463,7 +464,7 @@ pub async fn setup_test_prom_app_with_frontend( ) .with_prom_handler(frontend_ref.clone(), true, is_strict_mode) .with_prometheus_handler(frontend_ref) - .with_greptime_config_options(instance.opts.datanode_options().to_toml_string()) + .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build(); let app = http_server.build(http_server.make_app()); (app, instance.guard)