refactor: make the command entry cleaner (#3981)

* refactor: move run() in App trait

* refactor: introduce AppBuilder trait

* chore: remove AppBuilder

* refactor: remove Options struct and make the start() clean

* refactor: init once for common_telemetry::init_global_logging
This commit is contained in:
zyy17
2024-05-20 11:34:06 +08:00
committed by GitHub
parent df13832a59
commit 82c3eca25e
10 changed files with 312 additions and 399 deletions

View File

@@ -14,12 +14,10 @@
#![doc = include_str!("../../../../README.md")]
use std::fmt;
use clap::{Parser, Subcommand};
use cmd::error::Result;
use cmd::options::{GlobalOptions, Options};
use cmd::{cli, datanode, frontend, log_versions, metasrv, standalone, start_app, App};
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, frontend, log_versions, metasrv, standalone, App};
use common_version::{short_version, version};
#[derive(Parser)]
@@ -56,58 +54,6 @@ enum SubCommand {
Cli(cli::Command),
}
impl SubCommand {
async fn build(self, opts: Options) -> Result<Box<dyn App>> {
let app: Box<dyn App> = match (self, opts) {
(SubCommand::Datanode(cmd), Options::Datanode(dn_opts)) => {
let app = cmd.build(*dn_opts).await?;
Box::new(app) as _
}
(SubCommand::Frontend(cmd), Options::Frontend(fe_opts)) => {
let app = cmd.build(*fe_opts).await?;
Box::new(app) as _
}
(SubCommand::Metasrv(cmd), Options::Metasrv(meta_opts)) => {
let app = cmd.build(*meta_opts).await?;
Box::new(app) as _
}
(SubCommand::Standalone(cmd), Options::Standalone(opts)) => {
let app = cmd.build(*opts).await?;
Box::new(app) as _
}
(SubCommand::Cli(cmd), Options::Cli(_)) => {
let app = cmd.build().await?;
Box::new(app) as _
}
_ => unreachable!(),
};
Ok(app)
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
match self {
SubCommand::Datanode(cmd) => cmd.load_options(global_options),
SubCommand::Frontend(cmd) => cmd.load_options(global_options),
SubCommand::Metasrv(cmd) => cmd.load_options(global_options),
SubCommand::Standalone(cmd) => cmd.load_options(global_options),
SubCommand::Cli(cmd) => cmd.load_options(global_options),
}
}
}
impl fmt::Display for SubCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubCommand::Datanode(..) => write!(f, "greptime-datanode"),
SubCommand::Frontend(..) => write!(f, "greptime-frontend"),
SubCommand::Metasrv(..) => write!(f, "greptime-metasrv"),
SubCommand::Standalone(..) => write!(f, "greptime-standalone"),
SubCommand::Cli(_) => write!(f, "greptime-cli"),
}
}
}
#[cfg(not(windows))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@@ -115,28 +61,43 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
async fn main() -> Result<()> {
setup_human_panic();
log_versions(version!(), short_version!());
start(Command::parse()).await
}
async fn start(cli: Command) -> Result<()> {
let subcmd = cli.subcmd;
let app_name = subcmd.to_string();
let opts = subcmd.load_options(&cli.global_options)?;
let _guard = common_telemetry::init_global_logging(
&app_name,
opts.logging_options(),
&cli.global_options.tracing_options(),
opts.node_id(),
);
log_versions(version!(), short_version!());
let app = subcmd.build(opts).await?;
start_app(app).await
match cli.subcmd {
SubCommand::Datanode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Frontend(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Metasrv(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Standalone(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
SubCommand::Cli(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?)
.await?
.run()
.await
}
}
}
fn setup_human_panic() {

View File

@@ -30,15 +30,17 @@ mod upgrade;
use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
// pub use repl::Repl;
use upgrade::UpgradeCommand;
use self::export::ExportCommand;
use crate::error::Result;
use crate::options::{GlobalOptions, Options};
use crate::options::GlobalOptions;
use crate::App;
pub const APP_NAME: &str = "greptime-cli";
#[async_trait]
pub trait Tool: Send + Sync {
async fn do_work(&self) -> Result<()>;
@@ -57,7 +59,7 @@ impl Instance {
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-cli"
APP_NAME
}
async fn start(&mut self) -> Result<()> {
@@ -80,11 +82,18 @@ pub struct Command {
}
impl Command {
pub async fn build(self) -> Result<Instance> {
pub async fn build(&self, opts: LoggingOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
APP_NAME,
&opts,
&TracingOptions::default(),
None,
);
self.cmd.build().await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<LoggingOptions> {
let mut logging_opts = LoggingOptions::default();
if let Some(dir) = &global_options.log_dir {
@@ -93,7 +102,7 @@ impl Command {
logging_opts.level.clone_from(&global_options.log_level);
Ok(Options::Cli(Box::new(logging_opts)))
Ok(logging_opts)
}
}
@@ -106,7 +115,7 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self) -> Result<Instance> {
async fn build(&self) -> Result<Instance> {
match self {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Upgrade(cmd) => cmd.build().await,

View File

@@ -32,9 +32,11 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::options::GlobalOptions;
use crate::App;
pub const APP_NAME: &str = "greptime-datanode";
pub struct Instance {
datanode: Datanode,
}
@@ -56,7 +58,7 @@ impl Instance {
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-datanode"
APP_NAME
}
async fn start(&mut self) -> Result<()> {
@@ -82,11 +84,11 @@ pub struct Command {
}
impl Command {
pub async fn build(self, opts: DatanodeOptions) -> Result<Instance> {
pub async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
self.subcmd.load_options(global_options)
}
}
@@ -97,13 +99,13 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: DatanodeOptions) -> Result<Instance> {
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
match self {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
@@ -135,17 +137,15 @@ struct StartCommand {
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
Ok(Options::Datanode(Box::new(
self.merge_with_cli_options(
global_options,
DatanodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
self.merge_with_cli_options(
global_options,
DatanodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)
}
// The precedence order is: cli > config file > environment variables > default values.
@@ -226,7 +226,14 @@ impl StartCommand {
Ok(opts)
}
async fn build(self, mut opts: DatanodeOptions) -> Result<Instance> {
async fn build(&self, mut opts: DatanodeOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
opts.node_id.map(|x| x.to_string()),
);
let plugins = plugins::setup_datanode_plugins(&mut opts)
.await
.context(StartDatanodeSnafu)?;
@@ -337,10 +344,7 @@ mod tests {
..Default::default()
};
let Options::Datanode(options) = cmd.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
@@ -399,23 +403,19 @@ mod tests {
#[test]
fn test_try_from_cmd() {
if let Options::Datanode(opt) = StartCommand::default()
let opt = StartCommand::default()
.load_options(&GlobalOptions::default())
.unwrap()
{
assert_eq!(Mode::Standalone, opt.mode)
}
.unwrap();
assert_eq!(Mode::Standalone, opt.mode);
if let Options::Datanode(opt) = (StartCommand {
let opt = (StartCommand {
node_id: Some(42),
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(&GlobalOptions::default())
.unwrap()
{
assert_eq!(Mode::Distributed, opt.mode)
}
.unwrap();
assert_eq!(Mode::Distributed, opt.mode);
assert!((StartCommand {
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
@@ -447,7 +447,7 @@ mod tests {
})
.unwrap();
let logging_opt = options.logging_options();
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -527,11 +527,7 @@ mod tests {
..Default::default()
};
let Options::Datanode(opts) =
command.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
// Should be read from env, env > default values.
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {

View File

@@ -45,13 +45,15 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::options::GlobalOptions;
use crate::App;
pub struct Instance {
frontend: FeInstance,
}
pub const APP_NAME: &str = "greptime-frontend";
impl Instance {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
@@ -69,7 +71,7 @@ impl Instance {
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-frontend"
APP_NAME
}
async fn start(&mut self) -> Result<()> {
@@ -95,11 +97,11 @@ pub struct Command {
}
impl Command {
pub async fn build(self, opts: FrontendOptions) -> Result<Instance> {
pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
self.subcmd.load_options(global_options)
}
}
@@ -110,13 +112,13 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: FrontendOptions) -> Result<Instance> {
async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
match self {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
@@ -156,17 +158,15 @@ pub struct StartCommand {
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
Ok(Options::Frontend(Box::new(
self.merge_with_cli_options(
global_options,
FrontendOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
self.merge_with_cli_options(
global_options,
FrontendOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)
}
// The precedence order is: cli > config file > environment variables > default values.
@@ -239,7 +239,14 @@ impl StartCommand {
Ok(opts)
}
async fn build(self, mut opts: FrontendOptions) -> Result<Instance> {
async fn build(&self, mut opts: FrontendOptions) -> Result<Instance> {
let _guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
opts.node_id.clone(),
);
#[allow(clippy::unnecessary_mut_passed)]
let plugins = plugins::setup_frontend_plugins(&mut opts)
.await
@@ -379,10 +386,7 @@ mod tests {
..Default::default()
};
let Options::Frontend(opts) = command.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
assert_eq!(opts.http.addr, "127.0.0.1:1234");
assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
@@ -430,10 +434,7 @@ mod tests {
..Default::default()
};
let Options::Frontend(fe_opts) = command.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let fe_opts = command.load_options(&GlobalOptions::default()).unwrap();
assert_eq!(Mode::Distributed, fe_opts.mode);
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
@@ -486,7 +487,7 @@ mod tests {
})
.unwrap();
let logging_opt = options.logging_options();
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -562,11 +563,7 @@ mod tests {
..Default::default()
};
let Options::Frontend(fe_opts) =
command.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let fe_opts = command.load_options(&GlobalOptions::default()).unwrap();
// Should be read from env, env > default values.
assert_eq!(fe_opts.mysql.runtime_size, 11);

View File

@@ -17,6 +17,8 @@
use async_trait::async_trait;
use common_telemetry::{error, info};
use crate::error::Result;
pub mod cli;
pub mod datanode;
pub mod error;
@@ -35,39 +37,39 @@ pub trait App: Send {
fn name(&self) -> &str;
/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
async fn pre_start(&mut self) -> error::Result<()> {
async fn pre_start(&mut self) -> Result<()> {
Ok(())
}
async fn start(&mut self) -> error::Result<()>;
async fn start(&mut self) -> Result<()>;
/// Waits the quit signal by default.
fn wait_signal(&self) -> bool {
true
}
async fn stop(&self) -> error::Result<()>;
}
async fn stop(&self) -> Result<()>;
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
info!("Starting app: {}", app.name());
async fn run(&mut self) -> Result<()> {
info!("Starting app: {}", self.name());
app.pre_start().await?;
self.pre_start().await?;
app.start().await?;
self.start().await?;
if app.wait_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
if self.wait_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
}
}
app.stop().await?;
info!("Goodbye!");
Ok(())
self.stop().await?;
info!("Goodbye!");
Ok(())
}
}
/// Log the versions of the application, and the arguments passed to the cli.

View File

@@ -24,9 +24,11 @@ use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::{GlobalOptions, Options};
use crate::options::GlobalOptions;
use crate::App;
pub const APP_NAME: &str = "greptime-metasrv";
pub struct Instance {
instance: MetasrvInstance,
}
@@ -40,7 +42,7 @@ impl Instance {
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-metasrv"
APP_NAME
}
async fn start(&mut self) -> Result<()> {
@@ -66,11 +68,11 @@ pub struct Command {
}
impl Command {
pub async fn build(self, opts: MetasrvOptions) -> Result<Instance> {
pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
self.subcmd.load_options(global_options)
}
}
@@ -81,13 +83,13 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: MetasrvOptions) -> Result<Instance> {
async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
match self {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
@@ -128,17 +130,15 @@ struct StartCommand {
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
Ok(Options::Metasrv(Box::new(
self.merge_with_cli_options(
global_options,
MetasrvOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
self.merge_with_cli_options(
global_options,
MetasrvOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)
}
// The precedence order is: cli > config file > environment variables > default values.
@@ -212,7 +212,10 @@ impl StartCommand {
Ok(opts)
}
async fn build(self, mut opts: MetasrvOptions) -> Result<Instance> {
async fn build(&self, mut opts: MetasrvOptions) -> Result<Instance> {
let _guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
let plugins = plugins::setup_metasrv_plugins(&mut opts)
.await
.context(StartMetaServerSnafu)?;
@@ -254,9 +257,7 @@ mod tests {
..Default::default()
};
let Options::Metasrv(options) = cmd.load_options(&GlobalOptions::default()).unwrap() else {
unreachable!()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector);
@@ -289,9 +290,7 @@ mod tests {
..Default::default()
};
let Options::Metasrv(options) = cmd.load_options(&GlobalOptions::default()).unwrap() else {
unreachable!()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
@@ -343,7 +342,7 @@ mod tests {
})
.unwrap();
let logging_opt = options.logging_options();
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
assert_eq!("debug", logging_opt.level.as_ref().unwrap());
}
@@ -398,11 +397,7 @@ mod tests {
..Default::default()
};
let Options::Metasrv(opts) =
command.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
// Should be read from env, env > default values.
assert_eq!(opts.bind_addr, "127.0.0.1:14002");

View File

@@ -13,20 +13,6 @@
// limitations under the License.
use clap::Parser;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use datanode::config::DatanodeOptions;
use frontend::frontend::FrontendOptions;
use meta_srv::metasrv::MetasrvOptions;
use crate::standalone::StandaloneOptions;
pub enum Options {
Datanode(Box<DatanodeOptions>),
Frontend(Box<FrontendOptions>),
Metasrv(Box<MetasrvOptions>),
Standalone(Box<StandaloneOptions>),
Cli(Box<LoggingOptions>),
}
#[derive(Parser, Default, Debug, Clone)]
pub struct GlobalOptions {
@@ -43,32 +29,3 @@ pub struct GlobalOptions {
#[arg(global = true)]
pub tokio_console_addr: Option<String>,
}
impl GlobalOptions {
pub fn tracing_options(&self) -> TracingOptions {
TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: self.tokio_console_addr.clone(),
}
}
}
impl Options {
pub fn logging_options(&self) -> &LoggingOptions {
match self {
Options::Datanode(opts) => &opts.logging,
Options::Frontend(opts) => &opts.logging,
Options::Metasrv(opts) => &opts.logging,
Options::Standalone(opts) => &opts.logging,
Options::Cli(opts) => opts,
}
}
pub fn node_id(&self) -> Option<String> {
match self {
Options::Metasrv(_) | Options::Cli(_) | Options::Standalone(_) => None,
Options::Datanode(opt) => opt.node_id.map(|x| x.to_string()),
Options::Frontend(opt) => opt.node_id.clone(),
}
}
}

View File

@@ -68,9 +68,11 @@ use crate::error::{
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, Options};
use crate::options::GlobalOptions;
use crate::App;
pub const APP_NAME: &str = "greptime-standalone";
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
@@ -78,11 +80,11 @@ pub struct Command {
}
impl Command {
pub async fn build(self, opts: StandaloneOptions) -> Result<Instance> {
pub async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<StandaloneOptions> {
self.subcmd.load_options(global_options)
}
}
@@ -93,13 +95,13 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: StandaloneOptions) -> Result<Instance> {
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
fn load_options(&self, global_options: &GlobalOptions) -> Result<StandaloneOptions> {
match self {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
@@ -212,7 +214,7 @@ pub struct Instance {
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-standalone"
APP_NAME
}
async fn start(&mut self) -> Result<()> {
@@ -287,17 +289,15 @@ pub struct StartCommand {
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<Options> {
Ok(Options::Standalone(Box::new(
self.merge_with_cli_options(
global_options,
StandaloneOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)?,
)))
fn load_options(&self, global_options: &GlobalOptions) -> Result<StandaloneOptions> {
self.merge_with_cli_options(
global_options,
StandaloneOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
)
}
// The precedence order is: cli > config file > environment variables > default values.
@@ -373,7 +373,10 @@ impl StartCommand {
#[allow(unreachable_code)]
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(self, opts: StandaloneOptions) -> Result<Instance> {
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
let _guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
info!("Standalone start command: {:#?}", self);
info!("Building standalone instance with {opts:#?}");
@@ -665,10 +668,7 @@ mod tests {
..Default::default()
};
let Options::Standalone(options) = cmd.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
let fe_opts = options.frontend_options();
let dn_opts = options.datanode_options();
let logging_opts = options.logging;
@@ -721,7 +721,7 @@ mod tests {
..Default::default()
};
let Options::Standalone(opts) = cmd
let opts = cmd
.load_options(&GlobalOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
@@ -729,10 +729,7 @@ mod tests {
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap()
else {
unreachable!()
};
.unwrap();
assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
@@ -794,11 +791,7 @@ mod tests {
..Default::default()
};
let Options::Standalone(opts) =
command.load_options(&GlobalOptions::default()).unwrap()
else {
unreachable!()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
// Should be read from env, env > default values.
assert_eq!(opts.logging.dir, "/other/log/dir");

View File

@@ -124,139 +124,144 @@ pub fn init_global_logging(
tracing_opts: &TracingOptions,
node_id: Option<String>,
) -> Vec<WorkerGuard> {
static START: Once = Once::new();
let mut guards = vec![];
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");
START.call_once(|| {
let dir = &opts.dir;
let level = &opts.level;
let enable_otlp_tracing = opts.enable_otlp_tracing;
// stdout log layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);
// Enable log compatible layer to convert log record to tracing span.
LogTracer::init().expect("log tracer must be valid");
Some(
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
} else {
None
};
// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) = tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);
// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);
// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
let subscriber = {
let tokio_console_layer = if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr
{
let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
});
println!("tokio-console listening on {addr}");
// stdout log layer.
let stdout_logging_layer = if opts.append_stdout {
let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(stdout_guard);
Some(
console_subscriber::ConsoleLayer::builder()
.server_addr(addr)
.spawn(),
Layer::new()
.with_writer(stdout_writer)
.with_ansi(atty::is(atty::Stream::Stdout)),
)
} else {
None
};
let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));
// file log layer.
let rolling_appender = RollingFileAppender::new(Rotation::HOURLY, dir, app_name);
let (rolling_writer, rolling_writer_guard) =
tracing_appender::non_blocking(rolling_appender);
let file_logging_layer = Layer::new().with_writer(rolling_writer).with_ansi(false);
guards.push(rolling_writer_guard);
let file_logging_layer = file_logging_layer.with_filter(filter);
// error file log layer.
let err_rolling_appender =
RollingFileAppender::new(Rotation::HOURLY, dir, format!("{}-{}", app_name, "err"));
let (err_rolling_writer, err_rolling_writer_guard) =
tracing_appender::non_blocking(err_rolling_appender);
let err_file_logging_layer = Layer::new()
.with_writer(err_rolling_writer)
.with_ansi(false);
guards.push(err_rolling_writer_guard);
Registry::default()
.with(tokio_console_layer)
// resolve log level settings from:
// - options from command line or config files
// - environment variable: RUST_LOG
// - default settings
let rust_log_env = std::env::var(EnvFilter::DEFAULT_ENV).ok();
let targets_string = level
.as_deref()
.or(rust_log_env.as_deref())
.unwrap_or(DEFAULT_LOG_TARGETS);
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.as_ref()
.map(create_sampler)
.map(Sampler::ParentBased)
.unwrap_or(Sampler::ParentBased(Box::new(Sampler::AlwaysOn)));
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
let subscriber = {
let tokio_console_layer =
if let Some(tokio_console_addr) = &tracing_opts.tokio_console_addr {
let addr: std::net::SocketAddr = tokio_console_addr.parse().unwrap_or_else(|e| {
panic!("Invalid binding address '{tokio_console_addr}' for tokio-console: {e}");
});
println!("tokio-console listening on {addr}");
Some(
console_subscriber::ConsoleLayer::builder()
.server_addr(addr)
.spawn(),
)
} else {
None
};
let stdout_logging_layer = stdout_logging_layer.map(|x| x.with_filter(filter.clone()));
let file_logging_layer = file_logging_layer.with_filter(filter);
Registry::default()
.with(tokio_console_layer)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
};
// consume the `tracing_opts`, to avoid "unused" warnings
let _ = tracing_opts;
#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR))
};
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));
// consume the `tracing_opts`, to avoid "unused" warnings
let _ = tracing_opts;
#[cfg(not(feature = "tokio-console"))]
let subscriber = Registry::default()
.with(filter)
.with(stdout_logging_layer)
.with(file_logging_layer)
.with(err_file_logging_layer.with_filter(filter::LevelFilter::ERROR));
if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
if enable_otlp_tracing {
global::set_text_map_propagator(TraceContextPropagator::new());
// otlp exporter
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
let subscriber = subscriber.with(tracing_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
} else {
tracing::subscriber::set_global_default(subscriber)
.expect("error setting global tracing subscriber");
}
});
guards
}

View File

@@ -274,9 +274,10 @@ mod tests {
use clap::Parser;
use client::Client;
use cmd::error::Result as CmdResult;
use cmd::options::{GlobalOptions, Options};
use cmd::options::GlobalOptions;
use cmd::{cli, standalone, App};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
use super::{Database, FlightContext};
@@ -312,12 +313,9 @@ mod tests {
"--data-home",
&*output_dir.path().to_string_lossy(),
]);
let Options::Standalone(standalone_opts) =
standalone.load_options(&GlobalOptions::default())?
else {
unreachable!()
};
let mut instance = standalone.build(*standalone_opts).await?;
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap();
let mut instance = standalone.build(standalone_opts).await?;
instance.start().await?;
let client = Client::with_urls(["127.0.0.1:4001"]);
@@ -348,7 +346,7 @@ mod tests {
"--target",
"create-table",
]);
let mut cli_app = cli.build().await?;
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;