feat: unify all protocol options (#2316)

* feat: unify all protocol options

* feat: adds enable to example configs

* chore: style

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
dennis zhuang
2023-09-03 22:48:07 -05:00
committed by Ruihang Xia
parent b1599ad3a5
commit a3d5931fca
9 changed files with 158 additions and 211 deletions

View File

@@ -20,6 +20,7 @@ runtime_size = 8
# MySQL server options, see `standalone.example.toml`.
[mysql_options]
enable = true
addr = "127.0.0.1:4002"
runtime_size = 2
@@ -31,6 +32,7 @@ key_path = ""
# PostgresSQL server options, see `standalone.example.toml`.
[postgres_options]
enable = true
addr = "127.0.0.1:4003"
runtime_size = 2
@@ -42,6 +44,7 @@ key_path = ""
# OpenTSDB protocol options, see `standalone.example.toml`.
[opentsdb_options]
enable = true
addr = "127.0.0.1:4242"
runtime_size = 2

View File

@@ -24,6 +24,8 @@ runtime_size = 8
# MySQL server options.
[mysql_options]
# Whether to enable
enable = true
# Server address, "127.0.0.1:4002" by default.
addr = "127.0.0.1:4002"
# The number of server worker threads, 2 by default.
@@ -45,6 +47,8 @@ key_path = ""
# PostgresSQL server options.
[postgres_options]
# Whether to enable
enable = true
# Server address, "127.0.0.1:4003" by default.
addr = "127.0.0.1:4003"
# The number of server worker threads, 2 by default.
@@ -61,6 +65,8 @@ key_path = ""
# OpenTSDB protocol options.
[opentsdb_options]
# Whether to enable
enable = true
# OpenTSDB telnet API server address, "127.0.0.1:4242" by default.
addr = "127.0.0.1:4242"
# The number of server worker threads, 2 by default.

View File

@@ -20,7 +20,6 @@ use common_base::Plugins;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::InfluxdbOptions;
use meta_client::MetaClientOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
@@ -145,45 +144,36 @@ impl StartCommand {
);
if let Some(addr) = &self.http_addr {
if let Some(http_opts) = &mut opts.http_options {
http_opts.addr = addr.clone()
}
opts.http_options.addr = addr.clone()
}
if let Some(disable_dashboard) = self.disable_dashboard {
opts.http_options
.get_or_insert_with(Default::default)
.disable_dashboard = disable_dashboard;
opts.http_options.disable_dashboard = disable_dashboard;
}
if let Some(addr) = &self.grpc_addr {
if let Some(grpc_opts) = &mut opts.grpc_options {
grpc_opts.addr = addr.clone()
}
opts.grpc_options.addr = addr.clone()
}
if let Some(addr) = &self.mysql_addr {
if let Some(mysql_opts) = &mut opts.mysql_options {
mysql_opts.addr = addr.clone();
mysql_opts.tls = tls_opts.clone();
}
opts.mysql_options.enable = true;
opts.mysql_options.addr = addr.clone();
opts.mysql_options.tls = tls_opts.clone();
}
if let Some(addr) = &self.postgres_addr {
if let Some(postgres_opts) = &mut opts.postgres_options {
postgres_opts.addr = addr.clone();
postgres_opts.tls = tls_opts;
}
opts.postgres_options.enable = true;
opts.postgres_options.addr = addr.clone();
opts.postgres_options.tls = tls_opts;
}
if let Some(addr) = &self.opentsdb_addr {
if let Some(opentsdb_addr) = &mut opts.opentsdb_options {
opentsdb_addr.addr = addr.clone();
}
opts.opentsdb_options.enable = true;
opts.opentsdb_options.addr = addr.clone();
}
if let Some(enable) = self.influxdb_enable {
opts.influxdb_options = Some(InfluxdbOptions { enable });
opts.influxdb_options.enable = enable;
}
if let Some(metasrv_addrs) = &self.metasrv_addr {
@@ -255,40 +245,32 @@ mod tests {
unreachable!()
};
assert_eq!(opts.http_options.as_ref().unwrap().addr, "127.0.0.1:1234");
assert_eq!(
ReadableSize::mb(64),
opts.http_options.as_ref().unwrap().body_limit
);
assert_eq!(opts.mysql_options.as_ref().unwrap().addr, "127.0.0.1:5678");
assert_eq!(
opts.postgres_options.as_ref().unwrap().addr,
"127.0.0.1:5432"
);
assert_eq!(
opts.opentsdb_options.as_ref().unwrap().addr,
"127.0.0.1:4321"
);
assert_eq!(opts.http_options.addr, "127.0.0.1:1234");
assert_eq!(ReadableSize::mb(64), opts.http_options.body_limit);
assert_eq!(opts.mysql_options.addr, "127.0.0.1:5678");
assert_eq!(opts.postgres_options.addr, "127.0.0.1:5432");
assert_eq!(opts.opentsdb_options.addr, "127.0.0.1:4321");
let default_opts = FrontendOptions::default();
assert_eq!(opts.grpc_options.addr, default_opts.grpc_options.addr);
assert!(opts.mysql_options.enable);
assert_eq!(
opts.grpc_options.unwrap().addr,
default_opts.grpc_options.unwrap().addr
opts.mysql_options.runtime_size,
default_opts.mysql_options.runtime_size
);
assert!(opts.postgres_options.enable);
assert_eq!(
opts.mysql_options.as_ref().unwrap().runtime_size,
default_opts.mysql_options.as_ref().unwrap().runtime_size
opts.postgres_options.runtime_size,
default_opts.postgres_options.runtime_size
);
assert!(opts.opentsdb_options.enable);
assert_eq!(
opts.postgres_options.as_ref().unwrap().runtime_size,
default_opts.postgres_options.as_ref().unwrap().runtime_size
);
assert_eq!(
opts.opentsdb_options.as_ref().unwrap().runtime_size,
default_opts.opentsdb_options.as_ref().unwrap().runtime_size
opts.opentsdb_options.runtime_size,
default_opts.opentsdb_options.runtime_size
);
assert!(!opts.influxdb_options.unwrap().enable);
assert!(!opts.influxdb_options.enable);
}
#[test]
@@ -319,19 +301,10 @@ mod tests {
unreachable!()
};
assert_eq!(Mode::Distributed, fe_opts.mode);
assert_eq!(
"127.0.0.1:4000".to_string(),
fe_opts.http_options.as_ref().unwrap().addr
);
assert_eq!(
Duration::from_secs(30),
fe_opts.http_options.as_ref().unwrap().timeout
);
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http_options.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http_options.timeout);
assert_eq!(
ReadableSize::gb(2),
fe_opts.http_options.as_ref().unwrap().body_limit
);
assert_eq!(ReadableSize::gb(2), fe_opts.http_options.body_limit);
assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), fe_opts.logging.dir);
@@ -457,7 +430,7 @@ mod tests {
};
// Should be read from env, env > default values.
assert_eq!(fe_opts.mysql_options.as_ref().unwrap().runtime_size, 11);
assert_eq!(fe_opts.mysql_options.runtime_size, 11);
assert_eq!(
fe_opts.meta_client_options.unwrap().metasrv_addrs,
vec![
@@ -468,22 +441,13 @@ mod tests {
);
// Should be read from config file, config file > env > default values.
assert_eq!(
fe_opts.mysql_options.as_ref().unwrap().addr,
"127.0.0.1:4002"
);
assert_eq!(fe_opts.mysql_options.addr, "127.0.0.1:4002");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(
fe_opts.http_options.as_ref().unwrap().addr,
"127.0.0.1:14000"
);
assert_eq!(fe_opts.http_options.addr, "127.0.0.1:14000");
// Should be default value.
assert_eq!(
fe_opts.grpc_options.as_ref().unwrap().addr,
GrpcOptions::default().addr
);
assert_eq!(fe_opts.grpc_options.addr, GrpcOptions::default().addr);
},
);
}

View File

@@ -83,13 +83,13 @@ pub struct StandaloneOptions {
pub mode: Mode,
pub enable_memory_catalog: bool,
pub enable_telemetry: bool,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prom_store_options: Option<PromStoreOptions>,
pub http_options: HttpOptions,
pub grpc_options: GrpcOptions,
pub mysql_options: MysqlOptions,
pub postgres_options: PostgresOptions,
pub opentsdb_options: OpentsdbOptions,
pub influxdb_options: InfluxdbOptions,
pub prom_store_options: PromStoreOptions,
pub wal: WalConfig,
pub storage: StorageConfig,
pub procedure: ProcedureConfig,
@@ -102,13 +102,13 @@ impl Default for StandaloneOptions {
mode: Mode::Standalone,
enable_memory_catalog: false,
enable_telemetry: true,
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prom_store_options: Some(PromStoreOptions::default()),
http_options: HttpOptions::default(),
grpc_options: GrpcOptions::default(),
mysql_options: MysqlOptions::default(),
postgres_options: PostgresOptions::default(),
opentsdb_options: OpentsdbOptions::default(),
influxdb_options: InfluxdbOptions::default(),
prom_store_options: PromStoreOptions::default(),
wal: WalConfig::default(),
storage: StorageConfig::default(),
procedure: ProcedureConfig::default(),
@@ -234,9 +234,7 @@ impl StartCommand {
);
if let Some(addr) = &self.http_addr {
if let Some(http_opts) = &mut opts.http_options {
http_opts.addr = addr.clone()
}
opts.http_options.addr = addr.clone()
}
if let Some(addr) = &self.rpc_addr {
@@ -250,33 +248,28 @@ impl StartCommand {
}
.fail();
}
if let Some(grpc_opts) = &mut opts.grpc_options {
grpc_opts.addr = addr.clone()
}
opts.grpc_options.addr = addr.clone()
}
if let Some(addr) = &self.mysql_addr {
if let Some(mysql_opts) = &mut opts.mysql_options {
mysql_opts.addr = addr.clone();
mysql_opts.tls = tls_opts.clone();
}
opts.mysql_options.enable = true;
opts.mysql_options.addr = addr.clone();
opts.mysql_options.tls = tls_opts.clone();
}
if let Some(addr) = &self.postgres_addr {
if let Some(postgres_opts) = &mut opts.postgres_options {
postgres_opts.addr = addr.clone();
postgres_opts.tls = tls_opts;
}
opts.postgres_options.enable = true;
opts.postgres_options.addr = addr.clone();
opts.postgres_options.tls = tls_opts;
}
if let Some(addr) = &self.opentsdb_addr {
if let Some(opentsdb_addr) = &mut opts.opentsdb_options {
opentsdb_addr.addr = addr.clone();
}
opts.opentsdb_options.enable = true;
opts.opentsdb_options.addr = addr.clone();
}
if self.influxdb_enable {
opts.influxdb_options = Some(InfluxdbOptions { enable: true });
opts.influxdb_options.enable = self.influxdb_enable;
}
let fe_opts = opts.clone().frontend_options();
@@ -424,32 +417,15 @@ mod tests {
let dn_opts = options.dn_opts;
let logging_opts = options.logging;
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!(
"127.0.0.1:4000".to_string(),
fe_opts.http_options.as_ref().unwrap().addr
);
assert_eq!(
Duration::from_secs(30),
fe_opts.http_options.as_ref().unwrap().timeout
);
assert_eq!(
ReadableSize::mb(128),
fe_opts.http_options.as_ref().unwrap().body_limit
);
assert_eq!(
"127.0.0.1:4001".to_string(),
fe_opts.grpc_options.unwrap().addr
);
assert_eq!(
"127.0.0.1:4002",
fe_opts.mysql_options.as_ref().unwrap().addr
);
assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size);
assert_eq!(
None,
fe_opts.mysql_options.as_ref().unwrap().reject_no_database
);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http_options.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http_options.timeout);
assert_eq!(ReadableSize::mb(128), fe_opts.http_options.body_limit);
assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc_options.addr);
assert!(fe_opts.mysql_options.enable);
assert_eq!("127.0.0.1:4002", fe_opts.mysql_options.addr);
assert_eq!(2, fe_opts.mysql_options.runtime_size);
assert_eq!(None, fe_opts.mysql_options.reject_no_database);
assert!(fe_opts.influxdb_options.enable);
assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
match &dn_opts.storage.store {
@@ -561,20 +537,11 @@ mod tests {
assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(
opts.fe_opts.http_options.as_ref().unwrap().addr,
"127.0.0.1:14000"
);
assert_eq!(
ReadableSize::mb(64),
opts.fe_opts.http_options.as_ref().unwrap().body_limit
);
assert_eq!(opts.fe_opts.http_options.addr, "127.0.0.1:14000");
assert_eq!(ReadableSize::mb(64), opts.fe_opts.http_options.body_limit);
// Should be default value.
assert_eq!(
opts.fe_opts.grpc_options.unwrap().addr,
GrpcOptions::default().addr
);
assert_eq!(opts.fe_opts.grpc_options.addr, GrpcOptions::default().addr);
},
);
}

View File

@@ -30,14 +30,14 @@ pub struct FrontendOptions {
pub mode: Mode,
pub node_id: Option<String>,
pub heartbeat: HeartbeatOptions,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prom_store_options: Option<PromStoreOptions>,
pub otlp_options: Option<OtlpOptions>,
pub http_options: HttpOptions,
pub grpc_options: GrpcOptions,
pub mysql_options: MysqlOptions,
pub postgres_options: PostgresOptions,
pub opentsdb_options: OpentsdbOptions,
pub influxdb_options: InfluxdbOptions,
pub prom_store_options: PromStoreOptions,
pub otlp_options: OtlpOptions,
pub meta_client_options: Option<MetaClientOptions>,
pub logging: LoggingOptions,
pub datanode: DatanodeOptions,
@@ -49,14 +49,14 @@ impl Default for FrontendOptions {
mode: Mode::Standalone,
node_id: None,
heartbeat: HeartbeatOptions::default(),
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prom_store_options: Some(PromStoreOptions::default()),
otlp_options: Some(OtlpOptions::default()),
http_options: HttpOptions::default(),
grpc_options: GrpcOptions::default(),
mysql_options: MysqlOptions::default(),
postgres_options: PostgresOptions::default(),
opentsdb_options: OpentsdbOptions::default(),
influxdb_options: InfluxdbOptions::default(),
prom_store_options: PromStoreOptions::default(),
otlp_options: OtlpOptions::default(),
meta_client_options: None,
logging: LoggingOptions::default(),
datanode: DatanodeOptions::default(),

View File

@@ -37,7 +37,6 @@ use crate::error::Error::StartServer;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::instance::FrontendInstance;
use crate::service_config::{InfluxdbOptions, OtlpOptions, PromStoreOptions};
pub(crate) struct Services;
@@ -57,7 +56,9 @@ impl Services {
let mut result = Vec::<ServerHandler>::with_capacity(plugins.len());
let user_provider = plugins.get::<UserProviderRef>();
if let Some(opts) = &opts.grpc_options {
{
// Always init GRPC server
let opts = &opts.grpc_options;
let grpc_addr = parse_addr(&opts.addr)?;
let grpc_runtime = Arc::new(
@@ -78,9 +79,52 @@ impl Services {
);
result.push((Box::new(grpc_server), grpc_addr));
};
}
if let Some(opts) = &opts.mysql_options {
{
// Always init HTTP server
let http_options = &opts.http_options;
let http_addr = parse_addr(&http_options.addr)?;
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
let _ = http_server_builder
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));
if let Some(user_provider) = user_provider.clone() {
let _ = http_server_builder.with_user_provider(user_provider);
}
if opts.opentsdb_options.enable {
let _ = http_server_builder.with_opentsdb_handler(instance.clone());
}
if opts.influxdb_options.enable {
let _ = http_server_builder.with_influxdb_handler(instance.clone());
}
if opts.prom_store_options.enable {
let _ = http_server_builder
.with_prom_handler(instance.clone())
.with_prometheus_handler(instance.clone());
}
if opts.otlp_options.enable {
let _ = http_server_builder.with_otlp_handler(instance.clone());
}
let http_server = http_server_builder
.with_metrics_handler(MetricsHandler)
.with_script_handler(instance.clone())
.with_configurator(plugins.get::<ConfiguratorRef>())
.with_greptime_config_options(opts.to_toml_string())
.build();
result.push((Box::new(http_server), http_addr));
}
if opts.mysql_options.enable {
// Init MySQL server
let opts = &opts.mysql_options;
let mysql_addr = parse_addr(&opts.addr)?;
let mysql_io_runtime = Arc::new(
@@ -110,7 +154,9 @@ impl Services {
result.push((mysql_server, mysql_addr));
}
if let Some(opts) = &opts.postgres_options {
if opts.postgres_options.enable {
// Init PosgresSQL Server
let opts = &opts.postgres_options;
let pg_addr = parse_addr(&opts.addr)?;
let pg_io_runtime = Arc::new(
@@ -131,9 +177,9 @@ impl Services {
result.push((pg_server, pg_addr));
}
let mut set_opentsdb_handler = false;
if let Some(opts) = &opts.opentsdb_options {
if opts.opentsdb_options.enable {
// Init OpenTSDB server
let opts = &opts.opentsdb_options;
let addr = parse_addr(&opts.addr)?;
let io_runtime = Arc::new(
@@ -147,51 +193,6 @@ impl Services {
let server = OpentsdbServer::create_server(instance.clone(), io_runtime);
result.push((server, addr));
set_opentsdb_handler = true;
}
if let Some(http_options) = &opts.http_options {
let http_addr = parse_addr(&http_options.addr)?;
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
let _ = http_server_builder
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));
if let Some(user_provider) = user_provider.clone() {
let _ = http_server_builder.with_user_provider(user_provider);
}
if set_opentsdb_handler {
let _ = http_server_builder.with_opentsdb_handler(instance.clone());
}
if matches!(
opts.influxdb_options,
Some(InfluxdbOptions { enable: true })
) {
let _ = http_server_builder.with_influxdb_handler(instance.clone());
}
if matches!(
opts.prom_store_options,
Some(PromStoreOptions { enable: true })
) {
let _ = http_server_builder
.with_prom_handler(instance.clone())
.with_prometheus_handler(instance.clone());
}
if matches!(opts.otlp_options, Some(OtlpOptions { enable: true })) {
let _ = http_server_builder.with_otlp_handler(instance.clone());
}
let http_server = http_server_builder
.with_metrics_handler(MetricsHandler)
.with_script_handler(instance)
.with_configurator(plugins.get::<ConfiguratorRef>())
.with_greptime_config_options(opts.to_toml_string())
.build();
result.push((Box::new(http_server), http_addr));
}
Ok(result

View File

@@ -17,6 +17,7 @@ use servers::tls::TlsOption;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MysqlOptions {
pub enable: bool,
pub addr: String,
pub runtime_size: usize,
#[serde(default = "Default::default")]
@@ -27,6 +28,7 @@ pub struct MysqlOptions {
impl Default for MysqlOptions {
fn default() -> Self {
Self {
enable: true,
addr: "127.0.0.1:4002".to_string(),
runtime_size: 2,
tls: TlsOption::default(),

View File

@@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpentsdbOptions {
pub enable: bool,
pub addr: String,
pub runtime_size: usize,
}
@@ -23,6 +24,7 @@ pub struct OpentsdbOptions {
impl Default for OpentsdbOptions {
fn default() -> Self {
Self {
enable: true,
addr: "127.0.0.1:4242".to_string(),
runtime_size: 2,
}

View File

@@ -17,6 +17,7 @@ use servers::tls::TlsOption;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PostgresOptions {
pub enable: bool,
pub addr: String,
pub runtime_size: usize,
#[serde(default = "Default::default")]
@@ -26,6 +27,7 @@ pub struct PostgresOptions {
impl Default for PostgresOptions {
fn default() -> Self {
Self {
enable: true,
addr: "127.0.0.1:4003".to_string(),
runtime_size: 2,
tls: Default::default(),