refactor: remove MixOptions and use StandaloneOptions only (#3893)

* refactor: remove MixOptions and use StandaloneOptions only

* refactor: refactor code by code review comments

1. Use '&self' in frontend_options() and datanode_options();

2. Remove unused 'clone()';

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* ci: fix integration error

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2024-05-11 00:08:34 +08:00
committed by GitHub
parent 115c74791d
commit b1ef327bac
7 changed files with 122 additions and 198 deletions

View File

@@ -13,51 +13,25 @@
// limitations under the License.
use clap::Parser;
use common_config::KvBackendConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::MetasrvWalConfig;
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
use frontend::error::{Result as FeResult, TomlFormatSnafu};
use frontend::frontend::{FrontendOptions, TomlSerializable};
use datanode::config::DatanodeOptions;
use frontend::frontend::FrontendOptions;
use meta_srv::metasrv::MetasrvOptions;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu};
use crate::standalone::StandaloneOptions;
pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
/// Options mixed up from datanode, frontend and metasrv.
#[derive(Serialize, Debug, Clone)]
pub struct MixOptions {
pub data_home: String,
pub procedure: ProcedureConfig,
pub metadata_store: KvBackendConfig,
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub wal_meta: MetasrvWalConfig,
}
impl From<MixOptions> for FrontendOptions {
fn from(value: MixOptions) -> Self {
value.frontend
}
}
impl TomlSerializable for MixOptions {
fn to_toml(&self) -> FeResult<String> {
toml::to_string(self).context(TomlFormatSnafu)
}
}
pub enum Options {
Datanode(Box<DatanodeOptions>),
Frontend(Box<FrontendOptions>),
Metasrv(Box<MetasrvOptions>),
Standalone(Box<MixOptions>),
Standalone(Box<StandaloneOptions>),
Cli(Box<LoggingOptions>),
}
@@ -158,10 +132,9 @@ impl Options {
pub fn node_id(&self) -> Option<String> {
match self {
Options::Metasrv(_) | Options::Cli(_) => None,
Options::Metasrv(_) | Options::Cli(_) | Options::Standalone(_) => None,
Options::Datanode(opt) => opt.node_id.map(|x| x.to_string()),
Options::Frontend(opt) => opt.node_id.clone(),
Options::Standalone(opt) => opt.frontend.node_id.clone(),
}
}
}

View File

@@ -40,6 +40,7 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::error::{Result as FeResult, TomlFormatSnafu};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -61,7 +62,7 @@ use crate::error::{
Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, MixOptions, Options};
use crate::options::{GlobalOptions, Options};
use crate::App;
#[derive(Parser)]
@@ -71,7 +72,7 @@ pub struct Command {
}
impl Command {
pub async fn build(self, opts: MixOptions) -> Result<Instance> {
pub async fn build(self, opts: StandaloneOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
@@ -86,7 +87,7 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: MixOptions) -> Result<Instance> {
async fn build(self, opts: StandaloneOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
@@ -158,37 +159,43 @@ impl Default for StandaloneOptions {
}
impl StandaloneOptions {
fn frontend_options(self) -> FrontendOptions {
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
FrontendOptions {
mode: self.mode,
default_timezone: self.default_timezone,
http: self.http,
grpc: self.grpc,
mysql: self.mysql,
postgres: self.postgres,
opentsdb: self.opentsdb,
influxdb: self.influxdb,
prom_store: self.prom_store,
mode: cloned_opts.mode,
default_timezone: cloned_opts.default_timezone,
http: cloned_opts.http,
grpc: cloned_opts.grpc,
mysql: cloned_opts.mysql,
postgres: cloned_opts.postgres,
opentsdb: cloned_opts.opentsdb,
influxdb: cloned_opts.influxdb,
prom_store: cloned_opts.prom_store,
meta_client: None,
logging: self.logging,
user_provider: self.user_provider,
logging: cloned_opts.logging,
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: self.export_metrics,
export_metrics: cloned_opts.export_metrics,
..Default::default()
}
}
fn datanode_options(self) -> DatanodeOptions {
pub fn datanode_options(&self) -> DatanodeOptions {
let cloned_opts = self.clone();
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal.into(),
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal.into(),
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
rpc_addr: cloned_opts.grpc.addr,
..Default::default()
}
}
pub fn to_toml(&self) -> FeResult<String> {
toml::to_string(self).context(TomlFormatSnafu)
}
}
pub struct Instance {
@@ -277,20 +284,12 @@ pub struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
let opts: StandaloneOptions = Options::load_layered_options(
let mut opts: StandaloneOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
StandaloneOptions::env_list_keys(),
)?;
self.convert_options(global_options, opts)
}
pub fn convert_options(
&self,
global_options: &GlobalOptions,
mut opts: StandaloneOptions,
) -> Result<Options> {
opts.mode = Mode::Standalone;
if let Some(dir) = &global_options.log_dir {
@@ -323,8 +322,7 @@ impl StartCommand {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
),
}
.fail();
}.fail();
}
opts.grpc.addr.clone_from(addr)
}
@@ -347,47 +345,32 @@ impl StartCommand {
opts.user_provider.clone_from(&self.user_provider);
let metadata_store = opts.metadata_store.clone();
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let wal_meta = opts.wal.clone().into();
let datanode = opts.datanode_options().clone();
Ok(Options::Standalone(Box::new(MixOptions {
procedure,
metadata_store,
data_home: datanode.storage.data_home.to_string(),
frontend,
datanode,
logging,
wal_meta,
})))
Ok(Options::Standalone(Box::new(opts)))
}
#[allow(unreachable_code)]
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(self, opts: MixOptions) -> Result<Instance> {
async fn build(self, opts: StandaloneOptions) -> Result<Instance> {
info!("Standalone start command: {:#?}", self);
info!("Building standalone instance with {opts:#?}");
let mut fe_opts = opts.frontend;
let mut fe_opts = opts.frontend_options();
#[allow(clippy::unnecessary_mut_passed)]
let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it
.await
.context(StartFrontendSnafu)?;
let dn_opts = opts.datanode;
let dn_opts = opts.datanode_options();
set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
let data_home = &dn_opts.storage.data_home;
// Ensure the data_home directory exists.
fs::create_dir_all(path::Path::new(&opts.data_home)).context(CreateDirSnafu {
dir: &opts.data_home,
})?;
fs::create_dir_all(path::Path::new(data_home))
.context(CreateDirSnafu { dir: data_home })?;
let metadata_dir = metadata_store_dir(&opts.data_home);
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
opts.metadata_store.clone(),
@@ -424,7 +407,7 @@ impl StartCommand {
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal_meta.clone(),
opts.wal.into(),
kv_backend.clone(),
));
let table_metadata_manager =
@@ -621,8 +604,8 @@ mod tests {
else {
unreachable!()
};
let fe_opts = options.frontend;
let dn_opts = options.datanode;
let fe_opts = options.frontend_options();
let dn_opts = options.datanode_options();
let logging_opts = options.logging;
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
@@ -759,11 +742,12 @@ mod tests {
assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.frontend.http.addr, "127.0.0.1:14000");
assert_eq!(ReadableSize::mb(64), opts.frontend.http.body_limit);
let fe_opts = opts.frontend_options();
assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
// Should be default value.
assert_eq!(opts.frontend.grpc.addr, GrpcOptions::default().addr);
assert_eq!(fe_opts.grpc.addr, GrpcOptions::default().addr);
},
);
}

View File

@@ -75,6 +75,25 @@ impl From<StandaloneWalConfig> for MetasrvWalConfig {
}
}
impl From<MetasrvWalConfig> for StandaloneWalConfig {
fn from(config: MetasrvWalConfig) -> Self {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
..Default::default()
}),
}
}
}
impl From<StandaloneWalConfig> for DatanodeWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use catalog::kvbackend::KvBackendCatalogManager;
use cmd::options::MixOptions;
use cmd::standalone::StandaloneOptions;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::KvBackendConfig;
@@ -32,10 +32,8 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
@@ -45,7 +43,7 @@ use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, Test
pub struct GreptimeDbStandalone {
pub instance: Arc<Instance>,
pub mix_options: MixOptions,
pub opts: StandaloneOptions,
pub guard: TestGuard,
// Used in rebuild.
pub kv_backend: KvBackendRef,
@@ -115,13 +113,13 @@ impl GreptimeDbStandaloneBuilder {
&self,
kv_backend: KvBackendRef,
guard: TestGuard,
mix_options: MixOptions,
opts: StandaloneOptions,
procedure_manager: ProcedureManagerRef,
register_procedure_loaders: bool,
) -> GreptimeDbStandalone {
let plugins = self.plugin.clone().unwrap_or_default();
let datanode = DatanodeBuilder::new(mix_options.datanode.clone(), plugins.clone())
let datanode = DatanodeBuilder::new(opts.datanode_options(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
@@ -154,7 +152,7 @@ impl GreptimeDbStandaloneBuilder {
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
mix_options.wal_meta.clone(),
opts.wal.clone().into(),
kv_backend.clone(),
));
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
@@ -202,7 +200,7 @@ impl GreptimeDbStandaloneBuilder {
GreptimeDbStandalone {
instance: Arc::new(instance),
mix_options,
opts,
guard,
kv_backend,
procedure_manager,
@@ -231,18 +229,15 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
let wal_meta = self.metasrv_wal_config.clone();
let mix_options = MixOptions {
data_home: opts.storage.data_home.to_string(),
let standalone_opts = StandaloneOptions {
storage: opts.storage,
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
wal: self.metasrv_wal_config.clone().into(),
..StandaloneOptions::default()
};
self.build_with(kv_backend, guard, mix_options, procedure_manager, true)
self.build_with(kv_backend, guard, standalone_opts, procedure_manager, true)
.await
}
}

View File

@@ -33,7 +33,6 @@ use datanode::config::{
AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
StorageConfig,
};
use frontend::frontend::TomlSerializable;
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
@@ -392,7 +391,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
None,
)
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.with_greptime_config_options(instance.opts.datanode_options().to_toml_string())
.build();
(http_server.build(http_server.make_app()), instance.guard)
}
@@ -425,7 +424,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()),
Some(instance.instance.clone()),
)
.with_greptime_config_options(instance.mix_options.to_toml().unwrap());
.with_greptime_config_options(instance.opts.to_toml().unwrap());
if let Some(user_provider) = user_provider {
http_server = http_server.with_user_provider(user_provider);
@@ -464,7 +463,7 @@ pub async fn setup_test_prom_app_with_frontend(
)
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.with_greptime_config_options(instance.opts.datanode_options().to_toml_string())
.build();
let app = http_server.build(http_server.make_app());
(app, instance.guard)

View File

@@ -106,7 +106,7 @@ impl MockInstanceBuilder {
unreachable!()
};
let GreptimeDbStandalone {
mix_options,
opts,
guard,
kv_backend,
procedure_manager,
@@ -114,7 +114,7 @@ impl MockInstanceBuilder {
} = instance;
MockInstanceImpl::Standalone(
builder
.build_with(kv_backend, guard, mix_options, procedure_manager, false)
.build_with(kv_backend, guard, opts, procedure_manager, false)
.await,
)
}

View File

@@ -729,103 +729,54 @@ pub async fn test_config_api(store_type: StorageType) {
let expected_toml_str = format!(
r#"
[procedure]
max_retry_times = 3
retry_delay = "500ms"
[metadata_store]
file_size = "256MiB"
purge_threshold = "4GiB"
[frontend]
mode = "standalone"
enable_telemetry = true
[frontend.heartbeat]
interval = "18s"
retry_interval = "3s"
[frontend.http]
[http]
addr = "127.0.0.1:4000"
timeout = "30s"
body_limit = "64MiB"
is_strict_mode = false
[frontend.grpc]
[grpc]
addr = "127.0.0.1:4001"
runtime_size = 8
max_recv_message_size = "512MiB"
max_send_message_size = "512MiB"
[frontend.mysql]
[mysql]
enable = true
addr = "127.0.0.1:4002"
runtime_size = 2
[frontend.mysql.tls]
[mysql.tls]
mode = "disable"
cert_path = ""
key_path = ""
watch = false
[frontend.postgres]
[postgres]
enable = true
addr = "127.0.0.1:4003"
runtime_size = 2
[frontend.postgres.tls]
[postgres.tls]
mode = "disable"
cert_path = ""
key_path = ""
watch = false
[frontend.opentsdb]
[opentsdb]
enable = true
[frontend.influxdb]
[influxdb]
enable = true
[frontend.prom_store]
[prom_store]
enable = true
with_metric_engine = true
[frontend.otlp]
enable = true
[frontend.logging]
enable_otlp_tracing = false
append_stdout = true
[frontend.datanode.client]
timeout = "10s"
connect_timeout = "1s"
tcp_nodelay = true
[frontend.export_metrics]
enable = false
write_interval = "30s"
[datanode]
mode = "standalone"
node_id = 0
require_lease_before_startup = true
init_regions_in_background = false
rpc_addr = "127.0.0.1:3001"
rpc_runtime_size = 8
rpc_max_recv_message_size = "512MiB"
rpc_max_send_message_size = "512MiB"
enable_telemetry = true
[datanode.heartbeat]
interval = "3s"
retry_interval = "3s"
[datanode.http]
addr = "127.0.0.1:4000"
timeout = "30s"
body_limit = "64MiB"
is_strict_mode = false
[datanode.wal]
[wal]
provider = "raft_engine"
file_size = "256MiB"
purge_threshold = "4GiB"
@@ -835,13 +786,25 @@ sync_write = false
enable_log_recycle = true
prefill_log_files = false
[datanode.storage]
[storage]
type = "{}"
providers = []
[[datanode.region_engine]]
[metadata_store]
file_size = "256MiB"
purge_threshold = "4GiB"
[datanode.region_engine.mito]
[procedure]
max_retry_times = 3
retry_delay = "500ms"
[logging]
enable_otlp_tracing = false
append_stdout = true
[[region_engine]]
[region_engine.mito]
worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
@@ -855,7 +818,7 @@ sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
[datanode.region_engine.mito.inverted_index]
[region_engine.mito.inverted_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
@@ -863,29 +826,20 @@ write_buffer_size = "8MiB"
mem_threshold_on_create = "64.0MiB"
intermediate_path = ""
[datanode.region_engine.mito.memtable]
[region_engine.mito.memtable]
type = "time_series"
[[datanode.region_engine]]
[[region_engine]]
[datanode.region_engine.file]
[region_engine.file]
[datanode.logging]
enable_otlp_tracing = false
append_stdout = true
[datanode.export_metrics]
[export_metrics]
enable = false
write_interval = "30s"
[logging]
enable_otlp_tracing = false
append_stdout = true
[wal_meta]
provider = "raft_engine""#,
store_type,
);
write_interval = "30s""#,
store_type
)
.trim()
.to_string();
let body_text = drop_lines_with_inconsistent_results(res_get.text().await);
assert_eq!(body_text, expected_toml_str);
}