diff --git a/Cargo.lock b/Cargo.lock index d103e0181b..9013fb86ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1518,6 +1518,7 @@ dependencies = [ "common-recordbatch", "common-telemetry", "common-test-util", + "config", "datanode", "either", "frontend", @@ -1534,6 +1535,7 @@ dependencies = [ "session", "snafu", "substrait 0.2.0", + "temp-env", "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", @@ -1878,6 +1880,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "config" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d379af7f68bfc21714c6c7dea883544201741d2ce8274bb12fa54f89507f52a7" +dependencies = [ + "async-trait", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "yaml-rust", +] + [[package]] name = "console" version = "0.15.5" @@ -4288,6 +4309,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "jsonwebtoken" version = "8.3.0" @@ -4493,6 +4525,12 @@ dependencies = [ "cc", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.3.4" @@ -5750,6 +5788,12 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" +[[package]] +name = "pathdiff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -8660,6 +8704,15 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ae9980cab1db3fceee2f6c6f643d5d8de2997c58ee8d25fb0cc8a9e9e7348e5" +[[package]] +name = "temp-env" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9547444bfe52cbd79515c6c8087d8ae6ca8d64d2d31a27746320f5cb81d1a15c" +dependencies = [ + "parking_lot", +] + [[package]] name = "tempfile" version = "3.5.0" @@ -10282,6 +10335,15 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index ea80dde095..1e82a0a9ee 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -25,6 +25,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry", features = [ "deadlock_detection", ] } +config = "0.13" datanode = { path = "../datanode" } either = "1.8" frontend = { path = "../frontend" } @@ -37,20 +38,19 @@ query = { path = "../query" } rustyline = "10.1" serde.workspace = true servers = { path = "../servers" } - session = { path = "../session" } snafu.workspace = true substrait = { path = "../common/substrait" } tikv-jemalloc-ctl = { version = "0.5", optional = true } tikv-jemallocator = { version = "0.5", optional = true } tokio.workspace = true -toml = "0.5" - [dev-dependencies] common-test-util = { path = "../common/test-util" } rexpect = "0.5" +temp-env = "0.3" serde.workspace = true +toml = "0.5" [build-dependencies] build-data = "0.1.3" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index b3ce41450b..2eada5261b 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -23,7 +23,6 @@ use snafu::ResultExt; use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu}; use crate::options::{Options, TopLevelOptions}; -use crate::toml_loader; pub struct Instance { datanode: Datanode, @@ -99,15 +98,14 @@ struct StartCommand { http_addr: Option, #[clap(long)] http_timeout: Option, + #[clap(long, default_value = "GREPTIMEDB_DATANODE")] + env_prefix: String, } impl StartCommand { fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { - let mut opts: DatanodeOptions = if let Some(path) = &self.config_file { - toml_loader::from_file!(path)? - } else { - DatanodeOptions::default() - }; + let mut opts: DatanodeOptions = + Options::load_layered_options(self.config_file.as_deref(), self.env_prefix.as_ref())?; if let Some(dir) = top_level_opts.log_dir { opts.logging.dir = dir; @@ -116,26 +114,27 @@ impl StartCommand { opts.logging.level = level; } - if let Some(addr) = self.rpc_addr.clone() { - opts.rpc_addr = addr; + if let Some(addr) = &self.rpc_addr { + opts.rpc_addr = addr.clone(); } if self.rpc_hostname.is_some() { opts.rpc_hostname = self.rpc_hostname.clone(); } - if let Some(addr) = self.mysql_addr.clone() { - opts.mysql_addr = addr; + if let Some(addr) = &self.mysql_addr { + opts.mysql_addr = addr.clone(); } if let Some(node_id) = self.node_id { opts.node_id = Some(node_id); } - if let Some(meta_addr) = self.metasrv_addr.clone() { + if let Some(meta_addr) = &self.metasrv_addr { opts.meta_client_options .get_or_insert_with(MetaClientOptions::default) .metasrv_addrs = meta_addr + .clone() .split(',') .map(&str::trim) .map(&str::to_string) @@ -150,16 +149,20 @@ impl StartCommand { .fail(); } - if let Some(data_dir) = self.data_dir.clone() { - opts.storage.store = ObjectStoreConfig::File(FileConfig { data_dir }); + if let Some(data_dir) = &self.data_dir { + opts.storage.store = ObjectStoreConfig::File(FileConfig { + data_dir: data_dir.clone(), + }); } - if let Some(wal_dir) = self.wal_dir.clone() { - opts.wal.dir = wal_dir; + if let Some(wal_dir) = &self.wal_dir { + opts.wal.dir = wal_dir.clone(); } - if let Some(http_addr) = self.http_addr.clone() { - opts.http_opts.addr = http_addr + + if let Some(http_addr) = &self.http_addr { + opts.http_opts.addr = http_addr.clone(); } + if let Some(http_timeout) = self.http_timeout { opts.http_opts.timeout = Duration::from_secs(http_timeout) } @@ -191,6 +194,7 @@ mod tests { use servers::Mode; use super::*; + use crate::options::ENV_VAR_SEP; #[test] fn test_read_from_config_file() { @@ -350,4 +354,110 @@ mod tests { assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); assert_eq!("debug", logging_opt.level); } + + #[test] + fn test_config_precedence_order() { + let mut file = create_named_temp_file(); + let toml_str = r#" + mode = "distributed" + enable_memory_catalog = false + node_id = 42 + rpc_addr = "127.0.0.1:3001" + rpc_hostname = "127.0.0.1" + rpc_runtime_size = 8 + mysql_addr = "127.0.0.1:4406" + mysql_runtime_size = 2 + + [meta_client_options] + metasrv_addrs = ["127.0.0.1:3002"] + timeout_millis = 3000 + connect_timeout_millis = 5000 + tcp_nodelay = true + + [wal] + dir = "/tmp/greptimedb/wal" + file_size = "1GB" + purge_threshold = "50GB" + purge_interval = "10m" + read_batch_size = 128 + sync_write = false + + [storage] + type = "File" + data_dir = "/tmp/greptimedb/data/" + + [storage.compaction] + max_inflight_tasks = 3 + max_files_in_level0 = 7 + max_purge_tasks = 32 + + [storage.manifest] + checkpoint_on_startup = true + + [logging] + level = "debug" + dir = "/tmp/greptimedb/test/logs" + "#; + write!(file, "{}", toml_str).unwrap(); + + let env_prefix = "DATANODE_UT"; + temp_env::with_vars( + vec![ + ( + // storage.manifest.gc_duration = 9s + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "manifest".to_uppercase(), + "gc_duration".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("9s"), + ), + ( + // storage.compaction.max_purge_tasks = 99 + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "compaction".to_uppercase(), + "max_purge_tasks".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("99"), + ), + ], + || { + let command = StartCommand { + config_file: Some(file.path().to_str().unwrap().to_string()), + wal_dir: Some("/other/wal/dir".to_string()), + env_prefix: env_prefix.to_string(), + ..Default::default() + }; + + let Options::Datanode(opts) = + command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; + + // Should be read from env, env > default values. + assert_eq!( + opts.storage.manifest.gc_duration, + Some(Duration::from_secs(9)) + ); + + // Should be read from config file, config file > env > default values. + assert_eq!(opts.storage.compaction.max_purge_tasks, 32); + + // Should be read from cli, cli > config file > env > default values. + assert_eq!(opts.wal.dir, "/other/wal/dir"); + + // Should be default value. + assert_eq!( + opts.storage.manifest.checkpoint_margin, + DatanodeOptions::default() + .storage + .manifest + .checkpoint_margin + ); + }, + ); + } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index f680a33c7e..1a11dddae8 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use config::ConfigError; use rustyline::error::ReadlineError; use snafu::Location; @@ -70,12 +71,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse config, source: {}", source))] - ParseConfig { - source: toml::de::Error, - location: Location, - }, - #[snafu(display("Missing config, msg: {}", msg))] MissingConfig { msg: String, location: Location }, @@ -153,6 +148,12 @@ pub enum Error { #[snafu(backtrace)] source: substrait::error::Error, }, + + #[snafu(display("Failed to load layered config, source: {}", source))] + LoadLayeredConfig { + source: ConfigError, + location: Location, + }, } pub type Result = std::result::Result; @@ -168,13 +169,12 @@ impl ErrorExt for Error { Error::ShutdownMetaServer { source } => source.status_code(), Error::BuildMetaServer { source } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), - Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => { - StatusCode::InvalidArguments - } - Error::IllegalConfig { .. } | Error::InvalidReplCommand { .. } => { - StatusCode::InvalidArguments - } - Error::IllegalAuthConfig { .. } => StatusCode::InvalidArguments, + Error::ReadConfig { .. } + | Error::MissingConfig { .. } + | Error::LoadLayeredConfig { .. } + | Error::IllegalConfig { .. } + | Error::InvalidReplCommand { .. } + | Error::IllegalAuthConfig { .. } => StatusCode::InvalidArguments, Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal, Error::RequestDatabase { source, .. } => source.status_code(), Error::CollectRecordBatches { source } | Error::PrettyPrintRecordBatches { source } => { diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index ef58f0deca..a082a2ab1f 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -17,12 +17,8 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; use frontend::frontend::FrontendOptions; -use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; -use frontend::mysql::MysqlOptions; -use frontend::opentsdb::OpentsdbOptions; -use frontend::postgres::PostgresOptions; use frontend::prom::PromOptions; use meta_client::MetaClientOptions; use servers::auth::UserProviderRef; @@ -32,7 +28,6 @@ use snafu::ResultExt; use crate::error::{self, IllegalAuthConfigSnafu, Result}; use crate::options::{Options, TopLevelOptions}; -use crate::toml_loader; pub struct Instance { frontend: FeInstance, @@ -89,7 +84,7 @@ impl SubCommand { } } -#[derive(Debug, Parser)] +#[derive(Debug, Default, Parser)] pub struct StartCommand { #[clap(long)] http_addr: Option, @@ -119,15 +114,14 @@ pub struct StartCommand { user_provider: Option, #[clap(long)] disable_dashboard: Option, + #[clap(long, default_value = "GREPTIMEDB_FRONTEND")] + env_prefix: String, } impl StartCommand { fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { - let mut opts: FrontendOptions = if let Some(path) = &self.config_file { - toml_loader::from_file!(path)? - } else { - FrontendOptions::default() - }; + let mut opts: FrontendOptions = + Options::load_layered_options(self.config_file.as_deref(), self.env_prefix.as_ref())?; if let Some(dir) = top_level_opts.log_dir { opts.logging.dir = dir; @@ -136,14 +130,16 @@ impl StartCommand { opts.logging.level = level; } - let tls_option = TlsOption::new( + let tls_opts = TlsOption::new( self.tls_mode.clone(), self.tls_cert_path.clone(), self.tls_key_path.clone(), ); - if let Some(addr) = self.http_addr.clone() { - opts.http_options.get_or_insert_with(Default::default).addr = addr; + if let Some(addr) = &self.http_addr { + if let Some(http_opts) = &mut opts.http_options { + http_opts.addr = addr.clone() + } } if let Some(disable_dashboard) = self.disable_dashboard { @@ -152,43 +148,45 @@ impl StartCommand { .disable_dashboard = disable_dashboard; } - if let Some(addr) = self.grpc_addr.clone() { - opts.grpc_options = Some(GrpcOptions { - addr, - ..Default::default() - }); + if let Some(addr) = &self.grpc_addr { + if let Some(grpc_opts) = &mut opts.grpc_options { + grpc_opts.addr = addr.clone() + } } - if let Some(addr) = self.mysql_addr.clone() { - opts.mysql_options = Some(MysqlOptions { - addr, - tls: tls_option.clone(), - ..Default::default() - }); + if let Some(addr) = &self.mysql_addr { + if let Some(mysql_opts) = &mut opts.mysql_options { + mysql_opts.addr = addr.clone(); + mysql_opts.tls = tls_opts.clone(); + } } - if let Some(addr) = self.prom_addr.clone() { - opts.prom_options = Some(PromOptions { addr }); + + if let Some(addr) = &self.prom_addr { + opts.prom_options = Some(PromOptions { addr: addr.clone() }); } - if let Some(addr) = self.postgres_addr.clone() { - opts.postgres_options = Some(PostgresOptions { - addr, - tls: tls_option, - ..Default::default() - }); + + if let Some(addr) = &self.postgres_addr { + if let Some(postgres_opts) = &mut opts.postgres_options { + postgres_opts.addr = addr.clone(); + postgres_opts.tls = tls_opts; + } } - if let Some(addr) = self.opentsdb_addr.clone() { - opts.opentsdb_options = Some(OpentsdbOptions { - addr, - ..Default::default() - }); + + if let Some(addr) = &self.opentsdb_addr { + if let Some(opentsdb_addr) = &mut opts.opentsdb_options { + opentsdb_addr.addr = addr.clone(); + } } + if let Some(enable) = self.influxdb_enable { opts.influxdb_options = Some(InfluxdbOptions { enable }); } - if let Some(metasrv_addr) = self.metasrv_addr.clone() { + + if let Some(metasrv_addr) = &self.metasrv_addr { opts.meta_client_options .get_or_insert_with(MetaClientOptions::default) .metasrv_addrs = metasrv_addr + .clone() .split(',') .map(&str::trim) .map(&str::to_string) @@ -231,27 +229,23 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; + use frontend::grpc::GrpcOptions; use servers::auth::{Identity, Password, UserProviderRef}; use super::*; + use crate::options::ENV_VAR_SEP; #[test] fn test_try_from_start_command() { let command = StartCommand { http_addr: Some("127.0.0.1:1234".to_string()), - grpc_addr: None, prom_addr: Some("127.0.0.1:4444".to_string()), mysql_addr: Some("127.0.0.1:5678".to_string()), postgres_addr: Some("127.0.0.1:5432".to_string()), opentsdb_addr: Some("127.0.0.1:4321".to_string()), influxdb_enable: Some(false), - config_file: None, - metasrv_addr: None, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, - user_provider: None, disable_dashboard: Some(false), + ..Default::default() }; let Options::Frontend(opts) = @@ -307,20 +301,9 @@ mod tests { write!(file, "{}", toml_str).unwrap(); let command = StartCommand { - http_addr: None, - grpc_addr: None, - mysql_addr: None, - prom_addr: None, - postgres_addr: None, - opentsdb_addr: None, - influxdb_enable: None, config_file: Some(file.path().to_str().unwrap().to_string()), - metasrv_addr: None, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, - user_provider: None, disable_dashboard: Some(false), + ..Default::default() }; let Options::Frontend(fe_opts) = @@ -342,20 +325,9 @@ mod tests { #[tokio::test] async fn test_try_from_start_command_to_anymap() { let command = StartCommand { - http_addr: None, - grpc_addr: None, - mysql_addr: None, - prom_addr: None, - postgres_addr: None, - opentsdb_addr: None, - influxdb_enable: None, - config_file: None, - metasrv_addr: None, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, user_provider: Some("static_user_provider:cmd:test=test".to_string()), disable_dashboard: Some(false), + ..Default::default() }; let plugins = load_frontend_plugins(&command.user_provider); @@ -377,20 +349,8 @@ mod tests { #[test] fn test_top_level_options() { let cmd = StartCommand { - http_addr: None, - grpc_addr: None, - mysql_addr: None, - prom_addr: None, - postgres_addr: None, - opentsdb_addr: None, - influxdb_enable: None, - config_file: None, - metasrv_addr: None, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, - user_provider: None, disable_dashboard: Some(false), + ..Default::default() }; let options = cmd @@ -404,4 +364,91 @@ mod tests { assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); assert_eq!("debug", logging_opt.level); } + + #[test] + fn test_config_precedence_order() { + let mut file = create_named_temp_file(); + let toml_str = r#" + mode = "distributed" + + [http_options] + addr = "127.0.0.1:4000" + + [mysql_options] + addr = "127.0.0.1:4002" + "#; + write!(file, "{}", toml_str).unwrap(); + + let env_prefix = "FRONTEND_UT"; + temp_env::with_vars( + vec![ + ( + // mysql_options.addr = 127.0.0.1:14002 + vec![ + env_prefix.to_string(), + "mysql_options".to_uppercase(), + "addr".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("127.0.0.1:14002"), + ), + ( + // mysql_options.runtime_size = 11 + vec![ + env_prefix.to_string(), + "mysql_options".to_uppercase(), + "runtime_size".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("11"), + ), + ( + // http_options.addr = 127.0.0.1:24000 + vec![ + env_prefix.to_string(), + "http_options".to_uppercase(), + "addr".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("127.0.0.1:24000"), + ), + ], + || { + let command = StartCommand { + config_file: Some(file.path().to_str().unwrap().to_string()), + http_addr: Some("127.0.0.1:14000".to_string()), + env_prefix: env_prefix.to_string(), + ..Default::default() + }; + + let top_level_opts = TopLevelOptions { + log_dir: None, + log_level: Some("error".to_string()), + }; + let Options::Frontend(fe_opts) = + command.load_options(top_level_opts).unwrap() else {unreachable!()}; + + // Should be read from env, env > default values. + assert_eq!(fe_opts.mysql_options.as_ref().unwrap().runtime_size, 11); + + // Should be read from config file, config file > env > default values. + assert_eq!( + fe_opts.mysql_options.as_ref().unwrap().addr, + "127.0.0.1:4002" + ); + + // Should be read from cli, cli > config file > env > default values. + assert_eq!( + fe_opts.http_options.as_ref().unwrap().addr, + "127.0.0.1:14000" + ); + + // Should be default value. + assert_eq!( + fe_opts.grpc_options.as_ref().unwrap().addr, + GrpcOptions::default().addr + ); + }, + ); + } } diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index df2a46199b..b10af43095 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -21,4 +21,3 @@ pub mod frontend; pub mod metasrv; pub mod options; pub mod standalone; -mod toml_loader; diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index bc60c8da64..59edf69b1a 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -20,9 +20,8 @@ use meta_srv::bootstrap::MetaSrvInstance; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::options::{Options, TopLevelOptions}; -use crate::{error, toml_loader}; pub struct Instance { instance: MetaSrvInstance, @@ -79,7 +78,7 @@ impl SubCommand { } } -#[derive(Debug, Parser)] +#[derive(Debug, Default, Parser)] struct StartCommand { #[clap(long)] bind_addr: Option, @@ -97,15 +96,14 @@ struct StartCommand { http_addr: Option, #[clap(long)] http_timeout: Option, + #[clap(long, default_value = "GREPTIMEDB_METASRV")] + env_prefix: String, } impl StartCommand { fn load_options(&self, top_level_opts: TopLevelOptions) -> Result { - let mut opts: MetaSrvOptions = if let Some(path) = &self.config_file { - toml_loader::from_file!(path)? - } else { - MetaSrvOptions::default() - }; + let mut opts: MetaSrvOptions = + Options::load_layered_options(self.config_file.as_deref(), self.env_prefix.as_ref())?; if let Some(dir) = top_level_opts.log_dir { opts.logging.dir = dir; @@ -114,15 +112,18 @@ impl StartCommand { opts.logging.level = level; } - if let Some(addr) = self.bind_addr.clone() { - opts.bind_addr = addr; + if let Some(addr) = &self.bind_addr { + opts.bind_addr = addr.clone(); } - if let Some(addr) = self.server_addr.clone() { - opts.server_addr = addr; + + if let Some(addr) = &self.server_addr { + opts.server_addr = addr.clone(); } - if let Some(addr) = self.store_addr.clone() { - opts.store_addr = addr; + + if let Some(addr) = &self.store_addr { + opts.store_addr = addr.clone(); } + if let Some(selector_type) = &self.selector { opts.selector = selector_type[..] .try_into() @@ -133,9 +134,10 @@ impl StartCommand { opts.use_memory_store = true; } - if let Some(http_addr) = self.http_addr.clone() { - opts.http_opts.addr = http_addr; + if let Some(http_addr) = &self.http_addr { + opts.http_opts.addr = http_addr.clone(); } + if let Some(http_timeout) = self.http_timeout { opts.http_opts.timeout = Duration::from_secs(http_timeout); } @@ -167,6 +169,7 @@ mod tests { use meta_srv::selector::SelectorType; use super::*; + use crate::options::ENV_VAR_SEP; #[test] fn test_read_from_cmd() { @@ -174,11 +177,8 @@ mod tests { bind_addr: Some("127.0.0.1:3002".to_string()), server_addr: Some("127.0.0.1:3002".to_string()), store_addr: Some("127.0.0.1:2380".to_string()), - config_file: None, selector: Some("LoadBased".to_string()), - use_memory_store: false, - http_addr: None, - http_timeout: None, + ..Default::default() }; let Options::Metasrv(options) = @@ -206,14 +206,8 @@ mod tests { write!(file, "{}", toml_str).unwrap(); let cmd = StartCommand { - bind_addr: None, - server_addr: None, - store_addr: None, - selector: None, config_file: Some(file.path().to_str().unwrap().to_string()), - use_memory_store: false, - http_addr: None, - http_timeout: None, + ..Default::default() }; let Options::Metasrv(options) = @@ -233,11 +227,8 @@ mod tests { bind_addr: Some("127.0.0.1:3002".to_string()), server_addr: Some("127.0.0.1:3002".to_string()), store_addr: Some("127.0.0.1:2380".to_string()), - config_file: None, selector: Some("LoadBased".to_string()), - use_memory_store: false, - http_addr: None, - http_timeout: None, + ..Default::default() }; let options = cmd @@ -251,4 +242,72 @@ mod tests { assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir); assert_eq!("debug", logging_opt.level); } + + #[test] + fn test_config_precedence_order() { + let mut file = create_named_temp_file(); + let toml_str = r#" + server_addr = "127.0.0.1:3002" + datanode_lease_secs = 15 + selector = "LeaseBased" + use_memory_store = false + + [http_options] + addr = "127.0.0.1:4000" + + [logging] + level = "debug" + dir = "/tmp/greptimedb/test/logs" + "#; + write!(file, "{}", toml_str).unwrap(); + + let env_prefix = "METASRV_UT"; + temp_env::with_vars( + vec![ + ( + // bind_addr = 127.0.0.1:14002 + vec![env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP), + Some("127.0.0.1:14002"), + ), + ( + // server_addr = 127.0.0.1:13002 + vec![env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP), + Some("127.0.0.1:13002"), + ), + ( + // http_options.addr = 127.0.0.1:24000 + vec![ + env_prefix.to_string(), + "http_options".to_uppercase(), + "addr".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("127.0.0.1:24000"), + ), + ], + || { + let command = StartCommand { + http_addr: Some("127.0.0.1:14000".to_string()), + config_file: Some(file.path().to_str().unwrap().to_string()), + env_prefix: env_prefix.to_string(), + ..Default::default() + }; + + let Options::Metasrv(opts) = + command.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; + + // Should be read from env, env > default values. + assert_eq!(opts.bind_addr, "127.0.0.1:14002"); + + // Should be read from config file, config file > env > default values. + assert_eq!(opts.server_addr, "127.0.0.1:3002"); + + // Should be read from cli, cli > config file > env > default values. + assert_eq!(opts.http_opts.addr, "127.0.0.1:14000"); + + // Should be default value. + assert_eq!(opts.store_addr, "127.0.0.1:2379"); + }, + ); + } } diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index eeec11f2aa..595cf099b9 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -11,10 +11,18 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + use common_telemetry::logging::LoggingOptions; +use config::{Config, Environment, File, FileFormat}; use datanode::datanode::DatanodeOptions; use frontend::frontend::FrontendOptions; use meta_srv::metasrv::MetaSrvOptions; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::error::{LoadLayeredConfigSnafu, Result}; + +pub const ENV_VAR_SEP: &str = "__"; pub struct MixOptions { pub fe_opts: FrontendOptions, @@ -30,6 +38,12 @@ pub enum Options { Cli(Box), } +#[derive(Clone, Debug, Default)] +pub struct TopLevelOptions { + pub log_dir: Option, + pub log_level: Option, +} + impl Options { pub fn logging_options(&self) -> &LoggingOptions { match self { @@ -40,10 +54,190 @@ impl Options { Options::Cli(opts) => opts, } } + + /// Load the configuration from multiple sources and merge them. + /// The precedence order is: config file > environment variables > default values. + /// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx". + /// The function will use dunder(double underscore) `__` as the separator for environment variables, for example: + /// `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN` will be mapped to `DatanodeOptions.storage.manifest.checkpoint_margin` field in the configuration. + pub fn load_layered_options<'de, T: Serialize + Deserialize<'de> + Default>( + config_file: Option<&str>, + env_prefix: &str, + ) -> Result { + let default_opts = T::default(); + + let env_source = { + let mut env = Environment::default(); + + if !env_prefix.is_empty() { + env = env.prefix(env_prefix); + } + + env.try_parsing(true) + .separator(ENV_VAR_SEP) + .ignore_empty(true) + }; + + // Add default values and environment variables as the sources of the configuration. + let mut layered_config = Config::builder() + .add_source(Config::try_from(&default_opts).context(LoadLayeredConfigSnafu)?) + .add_source(env_source); + + // Add config file as the source of the configuration if it is specified. + if let Some(config_file) = config_file { + layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml)); + } + + let opts = layered_config + .build() + .context(LoadLayeredConfigSnafu)? + .try_deserialize() + .context(LoadLayeredConfigSnafu)?; + + Ok(opts) + } } -#[derive(Clone, Debug, Default)] -pub struct TopLevelOptions { - pub log_dir: Option, - pub log_level: Option, +#[cfg(test)] +mod tests { + use std::io::Write; + use std::time::Duration; + + use common_test_util::temp_dir::create_named_temp_file; + use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; + + use super::*; + + #[test] + fn test_load_layered_options() { + let mut file = create_named_temp_file(); + let toml_str = r#" + mode = "distributed" + enable_memory_catalog = false + rpc_addr = "127.0.0.1:3001" + rpc_hostname = "127.0.0.1" + rpc_runtime_size = 8 + mysql_addr = "127.0.0.1:4406" + mysql_runtime_size = 2 + + [meta_client_options] + metasrv_addrs = ["127.0.0.1:3002"] + timeout_millis = 3000 + connect_timeout_millis = 5000 + tcp_nodelay = true + + [wal] + dir = "/tmp/greptimedb/wal" + file_size = "1GB" + purge_threshold = "50GB" + purge_interval = "10m" + read_batch_size = 128 + sync_write = false + + [storage.compaction] + max_inflight_tasks = 3 + max_files_in_level0 = 7 + max_purge_tasks = 32 + + [logging] + level = "debug" + dir = "/tmp/greptimedb/test/logs" + "#; + write!(file, "{}", toml_str).unwrap(); + + let env_prefix = "DATANODE_UT"; + temp_env::with_vars( + // The following environment variables will be used to override the values in the config file. + vec![ + ( + // storage.manifest.checkpoint_margin = 99 + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "manifest".to_uppercase(), + "checkpoint_margin".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("99"), + ), + ( + // storage.type = S3 + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "type".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("S3"), + ), + ( + // storage.bucket = mybucket + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "bucket".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("mybucket"), + ), + ( + // storage.manifest.gc_duration = 42s + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "manifest".to_uppercase(), + "gc_duration".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("42s"), + ), + ( + // storage.manifest.checkpoint_on_startup = true + vec![ + env_prefix.to_string(), + "storage".to_uppercase(), + "manifest".to_uppercase(), + "checkpoint_on_startup".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("true"), + ), + ( + // wal.dir = /other/wal/dir + vec![ + env_prefix.to_string(), + "wal".to_uppercase(), + "dir".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("/other/wal/dir"), + ), + ], + || { + let opts: DatanodeOptions = + Options::load_layered_options(Some(file.path().to_str().unwrap()), env_prefix) + .unwrap(); + + // Check the configs from environment variables. + assert_eq!(opts.storage.manifest.checkpoint_margin, Some(99)); + match opts.storage.store { + ObjectStoreConfig::S3(s3_config) => { + assert_eq!(s3_config.bucket, "mybucket".to_string()); + } + _ => panic!("unexpected store type"), + } + assert_eq!( + opts.storage.manifest.gc_duration, + Some(Duration::from_secs(42)) + ); + assert!(opts.storage.manifest.checkpoint_on_startup); + + // Should be the values from config file, not environment variables. + assert_eq!(opts.wal.dir, "/tmp/greptimedb/wal".to_string()); + + // Should be default values. + assert_eq!(opts.node_id, None); + }, + ); + } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e29db68c75..7016e0c5ad 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -41,7 +41,6 @@ use crate::error::{ }; use crate::frontend::load_frontend_plugins; use crate::options::{MixOptions, Options, TopLevelOptions}; -use crate::toml_loader; #[derive(Parser)] pub struct Command { @@ -184,7 +183,7 @@ impl Instance { } } -#[derive(Debug, Parser)] +#[derive(Debug, Default, Parser)] struct StartCommand { #[clap(long)] http_addr: Option, @@ -212,43 +211,42 @@ struct StartCommand { tls_key_path: Option, #[clap(long)] user_provider: Option, + #[clap(long, default_value = "GREPTIMEDB_STANDALONE")] + env_prefix: String, } impl StartCommand { fn load_options(&self, top_level_options: TopLevelOptions) -> Result { - let enable_memory_catalog = self.enable_memory_catalog; - let config_file = &self.config_file; - let mut opts: StandaloneOptions = if let Some(path) = config_file { - toml_loader::from_file!(path)? - } else { - StandaloneOptions::default() - }; + let mut opts: StandaloneOptions = + Options::load_layered_options(self.config_file.as_deref(), self.env_prefix.as_ref())?; - opts.enable_memory_catalog = enable_memory_catalog; + opts.enable_memory_catalog = self.enable_memory_catalog; - let mut fe_opts = opts.clone().frontend_options(); - let mut logging = opts.logging.clone(); - let dn_opts = opts.datanode_options(); + opts.mode = Mode::Standalone; if let Some(dir) = top_level_options.log_dir { - logging.dir = dir; + opts.logging.dir = dir; } if let Some(level) = top_level_options.log_level { - logging.level = level; + opts.logging.level = level; } - fe_opts.mode = Mode::Standalone; + let tls_opts = TlsOption::new( + self.tls_mode.clone(), + self.tls_cert_path.clone(), + self.tls_key_path.clone(), + ); - if let Some(addr) = self.http_addr.clone() { - fe_opts.http_options = Some(HttpOptions { - addr, - ..Default::default() - }); + if let Some(addr) = &self.http_addr { + if let Some(http_opts) = &mut opts.http_options { + http_opts.addr = addr.clone() + } } - if let Some(addr) = self.rpc_addr.clone() { + + if let Some(addr) = &self.rpc_addr { // frontend grpc addr conflict with datanode default grpc addr let datanode_grpc_addr = DatanodeOptions::default().rpc_addr; - if addr == datanode_grpc_addr { + if addr.eq(&datanode_grpc_addr) { return IllegalConfigSnafu { msg: format!( "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}", @@ -256,56 +254,42 @@ impl StartCommand { } .fail(); } - fe_opts.grpc_options = Some(GrpcOptions { - addr, - ..Default::default() - }); + if let Some(grpc_opts) = &mut opts.grpc_options { + grpc_opts.addr = addr.clone() + } } - if let Some(addr) = self.mysql_addr.clone() { - fe_opts.mysql_options = Some(MysqlOptions { - addr, - ..Default::default() - }) + if let Some(addr) = &self.mysql_addr { + if let Some(mysql_opts) = &mut opts.mysql_options { + mysql_opts.addr = addr.clone(); + mysql_opts.tls = tls_opts.clone(); + } } - if let Some(addr) = self.prom_addr.clone() { - fe_opts.prom_options = Some(PromOptions { addr }) + if let Some(addr) = &self.prom_addr { + opts.prom_options = Some(PromOptions { addr: addr.clone() }) } - if let Some(addr) = self.postgres_addr.clone() { - fe_opts.postgres_options = Some(PostgresOptions { - addr, - ..Default::default() - }) + if let Some(addr) = &self.postgres_addr { + if let Some(postgres_opts) = &mut opts.postgres_options { + postgres_opts.addr = addr.clone(); + postgres_opts.tls = tls_opts; + } } - if let Some(addr) = self.opentsdb_addr.clone() { - fe_opts.opentsdb_options = Some(OpentsdbOptions { - addr, - ..Default::default() - }); + if let Some(addr) = &self.opentsdb_addr { + if let Some(opentsdb_addr) = &mut opts.opentsdb_options { + opentsdb_addr.addr = addr.clone(); + } } if self.influxdb_enable { - fe_opts.influxdb_options = Some(InfluxdbOptions { enable: true }); + opts.influxdb_options = Some(InfluxdbOptions { enable: true }); } - let tls_option = TlsOption::new( - self.tls_mode.clone(), - self.tls_cert_path.clone(), - self.tls_key_path.clone(), - ); - - if let Some(mut mysql_options) = fe_opts.mysql_options { - mysql_options.tls = tls_option.clone(); - fe_opts.mysql_options = Some(mysql_options); - } - - if let Some(mut postgres_options) = fe_opts.postgres_options { - postgres_options.tls = tls_option; - fe_opts.postgres_options = Some(postgres_options); - } + let fe_opts = opts.clone().frontend_options(); + let logging = opts.logging.clone(); + let dn_opts = opts.datanode_options(); Ok(Options::Standalone(Box::new(MixOptions { fe_opts, @@ -351,6 +335,7 @@ async fn build_frontend( #[cfg(test)] mod tests { + use std::default::Default; use std::io::Write; use std::time::Duration; @@ -359,23 +344,13 @@ mod tests { use servers::Mode; use super::*; + use crate::options::ENV_VAR_SEP; #[tokio::test] async fn test_try_from_start_command_to_anymap() { let command = StartCommand { - http_addr: None, - rpc_addr: None, - prom_addr: None, - mysql_addr: None, - postgres_addr: None, - opentsdb_addr: None, - config_file: None, - influxdb_enable: false, - enable_memory_catalog: false, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, user_provider: Some("static_user_provider:cmd:test=test".to_string()), + ..Default::default() }; let plugins = load_frontend_plugins(&command.user_provider); @@ -441,19 +416,9 @@ mod tests { "#; write!(file, "{}", toml_str).unwrap(); let cmd = StartCommand { - http_addr: None, - rpc_addr: None, - prom_addr: None, - mysql_addr: None, - postgres_addr: None, - opentsdb_addr: None, config_file: Some(file.path().to_str().unwrap().to_string()), - influxdb_enable: false, - enable_memory_catalog: false, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, user_provider: Some("static_user_provider:cmd:test=test".to_string()), + ..Default::default() }; let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap() else {unreachable!()}; @@ -504,19 +469,8 @@ mod tests { #[test] fn test_top_level_options() { let cmd = StartCommand { - http_addr: None, - rpc_addr: None, - prom_addr: None, - mysql_addr: None, - postgres_addr: None, - opentsdb_addr: None, - config_file: None, - influxdb_enable: false, - enable_memory_catalog: false, - tls_mode: None, - tls_cert_path: None, - tls_key_path: None, user_provider: Some("static_user_provider:cmd:test=test".to_string()), + ..Default::default() }; let Options::Standalone(opts) = cmd @@ -531,4 +485,88 @@ mod tests { assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir); assert_eq!("debug", opts.logging.level); } + + #[test] + fn test_config_precedence_order() { + let mut file = create_named_temp_file(); + let toml_str = r#" + mode = "standalone" + + [http_options] + addr = "127.0.0.1:4000" + + [logging] + level = "debug" + "#; + write!(file, "{}", toml_str).unwrap(); + + let env_prefix = "STANDALONE_UT"; + temp_env::with_vars( + vec![ + ( + // logging.dir = /other/log/dir + vec![ + env_prefix.to_string(), + "logging".to_uppercase(), + "dir".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("/other/log/dir"), + ), + ( + // logging.level = info + vec![ + env_prefix.to_string(), + "logging".to_uppercase(), + "level".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("info"), + ), + ( + // http_options.addr = 127.0.0.1:24000 + vec![ + env_prefix.to_string(), + "http_options".to_uppercase(), + "addr".to_uppercase(), + ] + .join(ENV_VAR_SEP), + Some("127.0.0.1:24000"), + ), + ], + || { + let command = StartCommand { + config_file: Some(file.path().to_str().unwrap().to_string()), + http_addr: Some("127.0.0.1:14000".to_string()), + env_prefix: env_prefix.to_string(), + ..Default::default() + }; + + let top_level_opts = TopLevelOptions { + log_dir: None, + log_level: None, + }; + let Options::Standalone(opts) = + command.load_options(top_level_opts).unwrap() else {unreachable!()}; + + // Should be read from env, env > default values. + assert_eq!(opts.logging.dir, "/other/log/dir"); + + // Should be read from config file, config file > env > default values. + assert_eq!(opts.logging.level, "debug"); + + // Should be read from cli, cli > config file > env > default values. + assert_eq!( + opts.fe_opts.http_options.as_ref().unwrap().addr, + "127.0.0.1:14000" + ); + + // Should be default value. + assert_eq!( + opts.fe_opts.grpc_options.unwrap().addr, + GrpcOptions::default().addr + ); + }, + ); + } } diff --git a/src/cmd/src/toml_loader.rs b/src/cmd/src/toml_loader.rs deleted file mode 100644 index 9d689e50bc..0000000000 --- a/src/cmd/src/toml_loader.rs +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -macro_rules! from_file { - ($path: expr) => { - toml::from_str( - &std::fs::read_to_string($path) - .context(crate::error::ReadConfigSnafu { path: $path })?, - ) - .context(crate::error::ParseConfigSnafu) - }; -} - -pub(crate) use from_file; - -#[cfg(test)] -mod tests { - use std::fs::File; - use std::io::Write; - - use common_test_util::temp_dir::create_temp_dir; - use serde::{Deserialize, Serialize}; - use snafu::ResultExt; - - use super::*; - use crate::error::Result; - - #[derive(Clone, PartialEq, Debug, Deserialize, Serialize)] - #[serde(default)] - struct MockConfig { - path: String, - port: u32, - host: String, - } - - impl Default for MockConfig { - fn default() -> Self { - Self { - path: "test".to_string(), - port: 0, - host: "localhost".to_string(), - } - } - } - - #[test] - fn test_from_file() -> Result<()> { - let config = MockConfig { - path: "/tmp".to_string(), - port: 999, - host: "greptime.test".to_string(), - }; - - let dir = create_temp_dir("test_from_file"); - let test_file = format!("{}/test.toml", dir.path().to_str().unwrap()); - - let s = toml::to_string(&config).unwrap(); - assert!(s.contains("host") && s.contains("path") && s.contains("port")); - - let mut file = File::create(&test_file).unwrap(); - file.write_all(s.as_bytes()).unwrap(); - - let loaded_config: MockConfig = from_file!(&test_file)?; - assert_eq!(loaded_config, config); - - // Only host in file - let mut file = File::create(&test_file).unwrap(); - file.write_all("host='greptime.test'\n".as_bytes()).unwrap(); - - let loaded_config: MockConfig = from_file!(&test_file)?; - assert_eq!(loaded_config.host, "greptime.test"); - assert_eq!(loaded_config.port, 0); - assert_eq!(loaded_config.path, "test"); - - // Truncate the file. - let file = File::create(&test_file).unwrap(); - file.set_len(0).unwrap(); - let loaded_config: MockConfig = from_file!(&test_file)?; - assert_eq!(loaded_config, MockConfig::default()); - - Ok(()) - } -}