refactor: add Configurable trait (#3917)

* refactor: add Configurable trait

* refactor: add merge_with_cli_options() to simplify load_options()

* docs: add comments

* fix: clippy errors

* fix: toml format

* fix: build error

* fix: clippy errors

* build: downgrade config-rs

* refactor: use '#[snafu(source(from()))'

* refactor: minor modification for load_layered_options() to make it clean
This commit is contained in:
zyy17
2024-05-15 20:56:40 +08:00
committed by GitHub
parent 6c621b7fcf
commit 63a8d293a1
26 changed files with 508 additions and 294 deletions

16
Cargo.lock generated
View File

@@ -1624,7 +1624,6 @@ dependencies = [
"common-time",
"common-version",
"common-wal",
"config",
"datanode",
"datatypes",
"either",
@@ -1728,9 +1727,22 @@ name = "common-config"
version = "0.7.2"
dependencies = [
"common-base",
"common-error",
"common-macro",
"common-telemetry",
"common-test-util",
"common-wal",
"config",
"datanode",
"meta-client",
"num_cpus",
"serde",
"serde_json",
"snafu 0.8.2",
"sysinfo",
"temp-env",
"tempfile",
"toml 0.8.12",
]
[[package]]
@@ -3149,6 +3161,7 @@ dependencies = [
"catalog",
"client",
"common-base",
"common-config",
"common-error",
"common-function",
"common-greptimedb-telemetry",
@@ -5601,6 +5614,7 @@ dependencies = [
"client",
"common-base",
"common-catalog",
"common-config",
"common-error",
"common-greptimedb-telemetry",
"common-grpc",

View File

@@ -100,6 +100,7 @@ bytemuck = "1.12"
bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }

View File

@@ -40,7 +40,6 @@ common-telemetry = { workspace = true, features = [
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
config = "0.13"
datanode.workspace = true
datatypes.workspace = true
either = "1.8"

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_wal::config::DatanodeWalConfig;
@@ -28,7 +29,9 @@ use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu};
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::App;
@@ -133,12 +136,24 @@ struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
let mut opts: DatanodeOptions = Options::load_layered_options(
Ok(Options::Datanode(Box::new(
self.merge_with_cli_options(
global_options,
DatanodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
DatanodeOptions::env_list_keys(),
)?;
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: DatanodeOptions,
) -> Result<DatanodeOptions> {
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -208,7 +223,7 @@ impl StartCommand {
// Disable dashboard in datanode.
opts.http.disable_dashboard = true;
Ok(Options::Datanode(Box::new(opts)))
Ok(opts)
}
async fn build(self, mut opts: DatanodeOptions) -> Result<Instance> {
@@ -259,13 +274,14 @@ mod tests {
use std::io::Write;
use std::time::Duration;
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use super::*;
use crate::options::{GlobalOptions, ENV_VAR_SEP};
use crate::options::GlobalOptions;
#[test]
fn test_read_from_config_file() {

View File

@@ -17,7 +17,6 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use config::ConfigError;
use rustyline::error::ReadlineError;
use snafu::{Location, Snafu};
@@ -209,8 +208,8 @@ pub enum Error {
#[snafu(display("Failed to load layered config"))]
LoadLayeredConfig {
#[snafu(source)]
error: ConfigError,
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
#[snafu(implicit)]
location: Location,
},

View File

@@ -23,6 +23,7 @@ use cache::{
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_config::Configurable;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
@@ -40,7 +41,9 @@ use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, InitTimezoneSnafu, MissingConfigSnafu, Result, StartFrontendSnafu};
use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::App;
@@ -153,12 +156,24 @@ pub struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
let mut opts: FrontendOptions = Options::load_layered_options(
Ok(Options::Frontend(Box::new(
self.merge_with_cli_options(
global_options,
FrontendOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
FrontendOptions::env_list_keys(),
)?;
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: FrontendOptions,
) -> Result<FrontendOptions> {
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -220,7 +235,7 @@ impl StartCommand {
opts.user_provider.clone_from(&self.user_provider);
Ok(Options::Frontend(Box::new(opts)))
Ok(opts)
}
async fn build(self, mut opts: FrontendOptions) -> Result<Instance> {
@@ -336,12 +351,13 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use frontend::service_config::GrpcOptions;
use servers::http::HttpOptions;
use super::*;
use crate::options::{GlobalOptions, ENV_VAR_SEP};
use crate::options::GlobalOptions;
#[test]
fn test_try_from_start_command() {

View File

@@ -16,13 +16,14 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
use crate::error::{self, Result, StartMetaServerSnafu};
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::{GlobalOptions, Options};
use crate::App;
@@ -128,12 +129,24 @@ struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
let mut opts: MetasrvOptions = Options::load_layered_options(
Ok(Options::Metasrv(Box::new(
self.merge_with_cli_options(
global_options,
MetasrvOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
MetasrvOptions::env_list_keys(),
)?;
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: MetasrvOptions,
) -> Result<MetasrvOptions> {
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -196,7 +209,7 @@ impl StartCommand {
// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;
Ok(Options::Metasrv(Box::new(opts)))
Ok(opts)
}
async fn build(self, mut opts: MetasrvOptions) -> Result<Instance> {
@@ -225,11 +238,11 @@ mod tests {
use std::io::Write;
use common_base::readable_size::ReadableSize;
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use meta_srv::selector::SelectorType;
use super::*;
use crate::options::ENV_VAR_SEP;
#[test]
fn test_read_from_cmd() {

View File

@@ -14,19 +14,12 @@
use clap::Parser;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use config::{Config, Environment, File, FileFormat};
use datanode::config::DatanodeOptions;
use frontend::frontend::FrontendOptions;
use meta_srv::metasrv::MetasrvOptions;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu};
use crate::standalone::StandaloneOptions;
pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
pub enum Options {
Datanode(Box<DatanodeOptions>),
Frontend(Box<FrontendOptions>),
@@ -71,65 +64,6 @@ impl Options {
}
}
/// Load the configuration from multiple sources and merge them.
/// The precedence order is: config file > environment variables > default values.
/// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx".
/// The function will use dunder(double underscore) `__` as the separator for environment variables, for example:
/// `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN` will be mapped to `DatanodeOptions.storage.manifest.checkpoint_margin` field in the configuration.
/// `list_keys` is the list of keys that should be parsed as a list, for example, you can pass `Some(&["meta_client_options.metasrv_addrs"]` to parse `GREPTIMEDB_METASRV__META_CLIENT_OPTIONS__METASRV_ADDRS` as a list.
/// The function will use comma `,` as the separator for list values, for example: `127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003`.
pub fn load_layered_options<'de, T: Serialize + Deserialize<'de> + Default>(
config_file: Option<&str>,
env_prefix: &str,
list_keys: Option<&[&str]>,
) -> Result<T> {
let default_opts = T::default();
let env_source = {
let mut env = Environment::default();
if !env_prefix.is_empty() {
env = env.prefix(env_prefix);
}
if let Some(list_keys) = list_keys {
env = env.list_separator(ENV_LIST_SEP);
for key in list_keys {
env = env.with_list_parse_key(key);
}
}
env.try_parsing(true)
.separator(ENV_VAR_SEP)
.ignore_empty(true)
};
// Workaround: Replacement for `Config::try_from(&default_opts)` due to
// `ConfigSerializer` cannot handle the case of an empty struct contained
// within an iterative structure.
// See: https://github.com/mehcode/config-rs/issues/461
let json_str = serde_json::to_string(&default_opts).context(SerdeJsonSnafu)?;
let default_config = File::from_str(&json_str, FileFormat::Json);
// Add default values and environment variables as the sources of the configuration.
let mut layered_config = Config::builder()
.add_source(default_config)
.add_source(env_source);
// Add config file as the source of the configuration if it is specified.
if let Some(config_file) = config_file {
layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml));
}
let opts = layered_config
.build()
.context(LoadLayeredConfigSnafu)?
.try_deserialize()
.context(LoadLayeredConfigSnafu)?;
Ok(opts)
}
pub fn node_id(&self) -> Option<String> {
match self {
Options::Metasrv(_) | Options::Cli(_) | Options::Standalone(_) => None,
@@ -138,127 +72,3 @@ impl Options {
}
}
}
#[cfg(test)]
mod tests {
use std::io::Write;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use super::*;
#[test]
fn test_load_layered_options() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
rpc_runtime_size = 8
mysql_addr = "127.0.0.1:4406"
mysql_runtime_size = 2
[meta_client]
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
let env_prefix = "DATANODE_UT";
temp_env::with_vars(
// The following environment variables will be used to override the values in the config file.
[
(
// storage.type = S3
[
env_prefix.to_string(),
"storage".to_uppercase(),
"type".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("S3"),
),
(
// storage.bucket = mybucket
[
env_prefix.to_string(),
"storage".to_uppercase(),
"bucket".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("mybucket"),
),
(
// wal.dir = /other/wal/dir
[
env_prefix.to_string(),
"wal".to_uppercase(),
"dir".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("/other/wal/dir"),
),
(
// meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
[
env_prefix.to_string(),
"meta_client".to_uppercase(),
"metasrv_addrs".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
),
],
|| {
let opts: DatanodeOptions = Options::load_layered_options(
Some(file.path().to_str().unwrap()),
env_prefix,
DatanodeOptions::env_list_keys(),
)
.unwrap();
// Check the configs from environment variables.
match &opts.storage.store {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
_ => panic!("unexpected store type"),
}
assert_eq!(
opts.meta_client.unwrap().metasrv_addrs,
vec![
"127.0.0.1:3001".to_string(),
"127.0.0.1:3002".to_string(),
"127.0.0.1:3003".to_string()
]
);
// Should be the values from config file, not environment variables.
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");
// Should be default values.
assert_eq!(opts.node_id, None);
},
);
}
}

View File

@@ -23,7 +23,7 @@ use cache::{
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
@@ -45,7 +45,6 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::error::{Result as FeResult, TomlFormatSnafu};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -64,9 +63,9 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{
BuildCacheRegistrySnafu, CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::App;
@@ -131,12 +130,6 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
}
impl StandaloneOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
@@ -166,6 +159,12 @@ impl Default for StandaloneOptions {
}
}
impl Configurable<'_> for StandaloneOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}
impl StandaloneOptions {
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
@@ -200,10 +199,6 @@ impl StandaloneOptions {
..Default::default()
}
}
pub fn to_toml(&self) -> FeResult<String> {
toml::to_string(self).context(TomlFormatSnafu)
}
}
pub struct Instance {
@@ -292,12 +287,25 @@ pub struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
let mut opts: StandaloneOptions = Options::load_layered_options(
Ok(Options::Standalone(Box::new(
self.merge_with_cli_options(
global_options,
StandaloneOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
StandaloneOptions::env_list_keys(),
)?;
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: StandaloneOptions,
) -> Result<StandaloneOptions> {
// Should always be standalone mode.
opts.mode = Mode::Standalone;
if let Some(dir) = &global_options.log_dir {
@@ -358,7 +366,7 @@ impl StartCommand {
opts.user_provider.clone_from(&self.user_provider);
Ok(Options::Standalone(Box::new(opts)))
Ok(opts)
}
#[allow(unreachable_code)]
@@ -541,13 +549,14 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;
use super::*;
use crate::options::{GlobalOptions, ENV_VAR_SEP};
use crate::options::GlobalOptions;
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
@@ -789,8 +798,8 @@ mod tests {
#[test]
fn test_load_default_standalone_options() {
let options: StandaloneOptions =
Options::load_layered_options(None, "GREPTIMEDB_FRONTEND", None).unwrap();
let options =
StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
let default_options = StandaloneOptions::default();
assert_eq!(options.mode, default_options.mode);
assert_eq!(options.enable_telemetry, default_options.enable_telemetry);

View File

@@ -9,6 +9,22 @@ workspace = true
[dependencies]
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
config.workspace = true
num_cpus.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
sysinfo.workspace = true
toml.workspace = true
[dev-dependencies]
common-telemetry.workspace = true
common-test-util.workspace = true
common-wal.workspace = true
datanode.workspace = true
meta-client.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true

View File

@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use config::{Environment, File, FileFormat};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu, TomlFormatSnafu};
/// Separator for environment variables. For example, `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN`.
pub const ENV_VAR_SEP: &str = "__";
/// Separator for list values in environment variables. For example, `localhost:3001,localhost:3002,localhost:3003`.
pub const ENV_LIST_SEP: &str = ",";
/// Configuration trait defines the common interface for configuration that can be loaded from multiple sources and serialized to TOML.
pub trait Configurable<'de>: Serialize + Deserialize<'de> + Default + Sized {
/// Load the configuration from multiple sources and merge them.
/// The precedence order is: config file > environment variables > default values.
/// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx".
/// The function will use dunder(double underscore) `__` as the separator for environment variables, for example:
/// `DATANODE__STORAGE__MANIFEST__CHECKPOINT_MARGIN` will be mapped to `DatanodeOptions.storage.manifest.checkpoint_margin` field in the configuration.
/// `list_keys` is the list of keys that should be parsed as a list, for example, you can pass `Some(&["meta_client_options.metasrv_addrs"]` to parse `GREPTIMEDB_METASRV__META_CLIENT_OPTIONS__METASRV_ADDRS` as a list.
/// The function will use comma `,` as the separator for list values, for example: `127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003`.
fn load_layered_options(config_file: Option<&str>, env_prefix: &str) -> Result<Self> {
let default_opts = Self::default();
let env_source = {
let mut env = Environment::default();
if !env_prefix.is_empty() {
env = env.prefix(env_prefix);
}
if let Some(list_keys) = Self::env_list_keys() {
env = env.list_separator(ENV_LIST_SEP);
for key in list_keys {
env = env.with_list_parse_key(key);
}
}
env.try_parsing(true)
.separator(ENV_VAR_SEP)
.ignore_empty(true)
};
// Workaround: Replacement for `Config::try_from(&default_opts)` due to
// `ConfigSerializer` cannot handle the case of an empty struct contained
// within an iterative structure.
// See: https://github.com/mehcode/config-rs/issues/461
let json_str = serde_json::to_string(&default_opts).context(SerdeJsonSnafu)?;
let default_config = File::from_str(&json_str, FileFormat::Json);
// Add default values and environment variables as the sources of the configuration.
let mut layered_config = config::Config::builder()
.add_source(default_config)
.add_source(env_source);
// Add config file as the source of the configuration if it is specified.
if let Some(config_file) = config_file {
layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml));
}
let opts = layered_config
.build()
.and_then(|x| x.try_deserialize())
.context(LoadLayeredConfigSnafu)?;
Ok(opts)
}
/// List of toml keys that should be parsed as a list.
fn env_list_keys() -> Option<&'static [&'static str]> {
None
}
/// Serialize the configuration to a TOML string.
fn to_toml(&self) -> Result<String> {
toml::to_string(&self).context(TomlFormatSnafu)
}
}
#[cfg(test)]
mod tests {
use std::io::Write;
use common_telemetry::logging::LoggingOptions;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{ObjectStoreConfig, StorageConfig};
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use super::*;
use crate::Mode;
#[derive(Debug, Serialize, Deserialize)]
struct TestDatanodeConfig {
mode: Mode,
node_id: Option<u64>,
logging: LoggingOptions,
meta_client: Option<MetaClientOptions>,
wal: DatanodeWalConfig,
storage: StorageConfig,
}
impl Default for TestDatanodeConfig {
fn default() -> Self {
Self {
mode: Mode::Distributed,
node_id: None,
logging: LoggingOptions::default(),
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
}
}
}
impl Configurable<'_> for TestDatanodeConfig {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
}
}
#[test]
fn test_load_layered_options() {
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
rpc_runtime_size = 8
mysql_addr = "127.0.0.1:4406"
mysql_runtime_size = 2
[meta_client]
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
"#;
write!(file, "{}", toml_str).unwrap();
let env_prefix = "DATANODE_UT";
temp_env::with_vars(
// The following environment variables will be used to override the values in the config file.
[
(
// storage.type = S3
[
env_prefix.to_string(),
"storage".to_uppercase(),
"type".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("S3"),
),
(
// storage.bucket = mybucket
[
env_prefix.to_string(),
"storage".to_uppercase(),
"bucket".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("mybucket"),
),
(
// wal.dir = /other/wal/dir
[
env_prefix.to_string(),
"wal".to_uppercase(),
"dir".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("/other/wal/dir"),
),
(
// meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
[
env_prefix.to_string(),
"meta_client".to_uppercase(),
"metasrv_addrs".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
),
],
|| {
let opts = TestDatanodeConfig::load_layered_options(
Some(file.path().to_str().unwrap()),
env_prefix,
)
.unwrap();
// Check the configs from environment variables.
match &opts.storage.store {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
_ => panic!("unexpected store type"),
}
assert_eq!(
opts.meta_client.unwrap().metasrv_addrs,
vec![
"127.0.0.1:3001".to_string(),
"127.0.0.1:3002".to_string(),
"127.0.0.1:3003".to_string()
]
);
// Should be the values from config file, not environment variables.
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");
// Should be default values.
assert_eq!(opts.node_id, None);
},
);
}
}

View File

@@ -0,0 +1,67 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use config::ConfigError;
use snafu::{Location, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to load layered config"))]
LoadLayeredConfig {
#[snafu(source)]
error: ConfigError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(source)]
error: toml::ser::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::TomlFormat { .. } | Error::LoadLayeredConfig { .. } => {
StatusCode::InvalidArguments
}
Error::SerdeJson { .. } => StatusCode::Unexpected,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod config;
pub mod error;
pub mod utils;
use common_base::readable_size::ReadableSize;
pub use config::*;
use serde::{Deserialize, Serialize};
pub fn metadata_store_dir(store_dir: &str) -> String {

View File

@@ -18,6 +18,7 @@ bytes.workspace = true
catalog.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-function.workspace = true
common-greptimedb-telemetry.workspace = true

View File

@@ -16,6 +16,7 @@
use common_base::readable_size::ReadableSize;
use common_base::secrets::SecretString;
use common_config::Configurable;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
@@ -266,14 +267,10 @@ impl Default for DatanodeOptions {
}
}
impl DatanodeOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
impl Configurable<'_> for DatanodeOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
}
pub fn to_toml_string(&self) -> String {
toml::to_string(&self).unwrap()
}
}
#[allow(clippy::large_enum_variant)]

View File

@@ -359,6 +359,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(implicit)]
location: Location,
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -391,7 +399,8 @@ impl ErrorExt for Error {
| MissingNodeId { .. }
| ColumnNoneDefaultValue { .. }
| MissingWalDirConfig { .. }
| MissingKvBackend { .. } => StatusCode::InvalidArguments,
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
StatusCode::Unexpected

View File

@@ -15,6 +15,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use common_config::Configurable;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
@@ -23,7 +24,7 @@ use servers::server::{ServerHandler, ServerHandlers};
use snafu::ResultExt;
use crate::config::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result};
use crate::error::{ParseAddrSnafu, Result, TomlFormatSnafu};
use crate::region_server::RegionServer;
pub struct DatanodeServiceBuilder<'a> {
@@ -75,7 +76,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
if self.enable_http_service {
let http_server = HttpServerBuilder::new(self.opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(self.opts.to_toml_string())
.with_greptime_config_options(self.opts.to_toml().context(TomlFormatSnafu)?)
.build();
let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
addr: &self.opts.http.addr,

View File

@@ -343,8 +343,10 @@ pub enum Error {
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(source)]
error: toml::ser::Error,
#[snafu(implicit)]
location: Location,
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},
#[snafu(display("Failed to get cache from cache registry: {}", name))]

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_config::config::Configurable;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
@@ -19,9 +20,7 @@ use servers::export_metrics::ExportMetricsOption;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
use snafu::prelude::*;
use crate::error::{Result, TomlFormatSnafu};
use crate::service_config::{
DatanodeOptions, GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions,
PostgresOptions, PromStoreOptions,
@@ -75,24 +74,10 @@ impl Default for FrontendOptions {
}
}
impl FrontendOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
impl Configurable<'_> for FrontendOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
}
pub fn to_toml_string(&self) -> String {
toml::to_string(&self).unwrap()
}
}
pub trait TomlSerializable {
fn to_toml(&self) -> Result<String>;
}
impl TomlSerializable for FrontendOptions {
fn to_toml(&self) -> Result<String> {
toml::to_string(&self).context(TomlFormatSnafu)
}
}
#[cfg(test)]

View File

@@ -30,7 +30,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_config::{Configurable, KvBackendConfig};
use common_error::ext::{BoxedError, ErrorExt};
use common_frontend::handler::FrontendInvoker;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -86,7 +86,7 @@ use crate::error::{
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
TableOperationSnafu,
};
use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -188,7 +188,7 @@ impl Instance {
pub fn build_servers(
&mut self,
opts: impl Into<FrontendOptions> + TomlSerializable,
opts: impl Into<FrontendOptions> + for<'de> Configurable<'de>,
servers: ServerHandlers,
) -> Result<()> {
let opts: FrontendOptions = opts.into();

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
@@ -31,14 +32,14 @@ use servers::server::{Server, ServerHandlers};
use servers::tls::{maybe_watch_tls_config, ReloadableTlsServerConfig};
use snafu::ResultExt;
use crate::error::{self, Result, StartServerSnafu};
use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
use crate::frontend::FrontendOptions;
use crate::instance::FrontendInstance;
use crate::service_config::GrpcOptions;
pub struct Services<T, U>
where
T: Into<FrontendOptions> + TomlSerializable + Clone,
T: Into<FrontendOptions> + for<'de> Configurable<'de> + Clone,
U: FrontendInstance,
{
opts: T,
@@ -50,7 +51,7 @@ where
impl<T, U> Services<T, U>
where
T: Into<FrontendOptions> + TomlSerializable + Clone,
T: Into<FrontendOptions> + for<'de> Configurable<'de> + Clone,
U: FrontendInstance,
{
pub fn new(opts: T, instance: Arc<U>, plugins: Plugins) -> Self {
@@ -171,7 +172,7 @@ where
let opts = self.opts.clone();
let instance = self.instance.clone();
let toml = opts.to_toml()?;
let toml = opts.to_toml().context(TomlFormatSnafu)?;
let opts: FrontendOptions = opts.into();
let handlers = ServerHandlers::default();

View File

@@ -16,6 +16,7 @@ async-trait = "0.1"
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-greptimedb-telemetry.workspace = true
common-grpc.workspace = true

View File

@@ -20,6 +20,7 @@ use api::v1::meta::lock_server::LockServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -38,7 +39,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
use crate::error::InitExportMetricsTaskSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetasrvBuilder;
@@ -73,7 +74,7 @@ impl MetasrvInstance {
let httpsrv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
.build(),
);
// put metasrv into plugins for later use

View File

@@ -832,6 +832,14 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(implicit)]
location: Location,
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
},
}
impl Error {
@@ -903,7 +911,8 @@ impl ErrorExt for Error {
| Error::InitExportMetricsTask { .. }
| Error::InvalidHeartbeatRequest { .. }
| Error::ProcedureNotFound { .. }
| Error::TooManyPartitions { .. } => StatusCode::InvalidArguments,
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::StatKeyFromUtf8 { .. }

View File

@@ -20,6 +20,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::ProcedureExecutorRef;
@@ -113,12 +114,6 @@ pub struct MetasrvOptions {
pub tracing: TracingOptions,
}
impl MetasrvOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}
impl Default for MetasrvOptions {
fn default() -> Self {
Self {
@@ -153,9 +148,9 @@ impl Default for MetasrvOptions {
}
}
impl MetasrvOptions {
pub fn to_toml_string(&self) -> String {
toml::to_string(&self).unwrap()
impl Configurable<'_> for MetasrvOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
}

View File

@@ -22,6 +22,7 @@ use auth::UserProviderRef;
use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::secrets::ExposeSecret;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_runtime::Builder as RuntimeBuilder;
@@ -391,7 +392,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
None,
)
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml_string())
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
(http_server.build(http_server.make_app()), instance.guard)
}
@@ -463,7 +464,7 @@ pub async fn setup_test_prom_app_with_frontend(
)
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml_string())
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
let app = http_server.build(http_server.make_app());
(app, instance.guard)