refactor: make instance started separately (#2911)

* refactor: make instance started separately, to support further integrated into other binaries

* fix: resolve PR comments

* fix: resolve PR comments
This commit is contained in:
LFC
2023-12-14 11:44:29 +08:00
committed by GitHub
parent 99dda93f0e
commit 181e16a11a
12 changed files with 369 additions and 284 deletions

View File

@@ -16,79 +16,12 @@
use std::fmt;
use clap::Parser;
use clap::{FromArgMatches, Parser, Subcommand};
use cmd::error::Result;
use cmd::options::{Options, TopLevelOptions};
use cmd::{cli, datanode, frontend, metasrv, standalone};
use common_telemetry::logging::{error, info, TracingOptions};
lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("app_version", "app version", &["short_version", "version"]).unwrap();
}
#[derive(Parser)]
#[clap(name = "greptimedb", version = print_version())]
struct Command {
#[clap(long)]
log_dir: Option<String>,
#[clap(long)]
log_level: Option<String>,
#[clap(subcommand)]
subcmd: SubCommand,
#[cfg(feature = "tokio-console")]
#[clap(long)]
tokio_console_addr: Option<String>,
}
pub enum Application {
Datanode(datanode::Instance),
Frontend(frontend::Instance),
Metasrv(metasrv::Instance),
Standalone(standalone::Instance),
Cli(cli::Instance),
}
impl Application {
async fn start(&mut self) -> Result<()> {
match self {
Application::Datanode(instance) => instance.start().await,
Application::Frontend(instance) => instance.start().await,
Application::Metasrv(instance) => instance.start().await,
Application::Standalone(instance) => instance.start().await,
Application::Cli(instance) => instance.start().await,
}
}
async fn stop(&self) -> Result<()> {
match self {
Application::Datanode(instance) => instance.stop().await,
Application::Frontend(instance) => instance.stop().await,
Application::Metasrv(instance) => instance.stop().await,
Application::Standalone(instance) => instance.stop().await,
Application::Cli(instance) => instance.stop().await,
}
}
}
impl Command {
async fn build(self, opts: Options) -> Result<Application> {
self.subcmd.build(opts).await
}
fn load_options(&self) -> Result<Options> {
let top_level_opts = self.top_level_options();
self.subcmd.load_options(top_level_opts)
}
fn top_level_options(&self) -> TopLevelOptions {
TopLevelOptions {
log_dir: self.log_dir.clone(),
log_level: self.log_level.clone(),
}
}
}
use cmd::options::{CliOptions, Options};
use cmd::{
cli, datanode, frontend, greptimedb_cli, log_versions, metasrv, standalone, start_app, App,
};
#[derive(Parser)]
enum SubCommand {
@@ -105,40 +38,41 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, opts: Options) -> Result<Application> {
match (self, opts) {
async fn build(self, opts: Options) -> Result<Box<dyn App>> {
let app: Box<dyn App> = match (self, opts) {
(SubCommand::Datanode(cmd), Options::Datanode(dn_opts)) => {
let app = cmd.build(*dn_opts).await?;
Ok(Application::Datanode(app))
Box::new(app) as _
}
(SubCommand::Frontend(cmd), Options::Frontend(fe_opts)) => {
let app = cmd.build(*fe_opts).await?;
Ok(Application::Frontend(app))
Box::new(app) as _
}
(SubCommand::Metasrv(cmd), Options::Metasrv(meta_opts)) => {
let app = cmd.build(*meta_opts).await?;
Ok(Application::Metasrv(app))
Box::new(app) as _
}
(SubCommand::Standalone(cmd), Options::Standalone(opts)) => {
let app = cmd.build(*opts).await?;
Ok(Application::Standalone(app))
Box::new(app) as _
}
(SubCommand::Cli(cmd), Options::Cli(_)) => {
let app = cmd.build().await?;
Ok(Application::Cli(app))
Box::new(app) as _
}
_ => unreachable!(),
}
};
Ok(app)
}
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
match self {
SubCommand::Datanode(cmd) => cmd.load_options(top_level_opts),
SubCommand::Frontend(cmd) => cmd.load_options(top_level_opts),
SubCommand::Metasrv(cmd) => cmd.load_options(top_level_opts),
SubCommand::Standalone(cmd) => cmd.load_options(top_level_opts),
SubCommand::Cli(cmd) => cmd.load_options(top_level_opts),
SubCommand::Datanode(cmd) => cmd.load_options(cli_options),
SubCommand::Frontend(cmd) => cmd.load_options(cli_options),
SubCommand::Metasrv(cmd) => cmd.load_options(cli_options),
SubCommand::Standalone(cmd) => cmd.load_options(cli_options),
SubCommand::Cli(cmd) => cmd.load_options(cli_options),
}
}
}
@@ -155,90 +89,41 @@ impl fmt::Display for SubCommand {
}
}
fn print_version() -> &'static str {
concat!(
"\nbranch: ",
env!("GIT_BRANCH"),
"\ncommit: ",
env!("GIT_COMMIT"),
"\ndirty: ",
env!("GIT_DIRTY"),
"\nversion: ",
env!("CARGO_PKG_VERSION")
)
}
fn short_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// {app_name}-{branch_name}-{commit_short}
// The branch name (tag) of a release build should already contain the short
// version so the full version doesn't concat the short version explicitly.
fn full_version() -> &'static str {
concat!(
"greptimedb-",
env!("GIT_BRANCH"),
"-",
env!("GIT_COMMIT_SHORT")
)
}
fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {
info!("argument: {}", argument);
}
}
#[cfg(not(windows))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
async fn main() -> Result<()> {
let cmd = Command::parse();
let app_name = &cmd.subcmd.to_string();
common_telemetry::set_panic_hook();
let opts = cmd.load_options()?;
let logging_opts = opts.logging_options();
let tracing_opts = TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: cmd.tokio_console_addr.clone(),
let cli = greptimedb_cli();
let cli = SubCommand::augment_subcommands(cli);
let args = cli.get_matches();
let subcmd = match SubCommand::from_arg_matches(&args) {
Ok(subcmd) => subcmd,
Err(e) => e.exit(),
};
common_telemetry::set_panic_hook();
let _guard =
common_telemetry::init_global_logging(app_name, logging_opts, tracing_opts, opts.node_id());
let app_name = subcmd.to_string();
// Report app version as gauge.
APP_VERSION
.with_label_values(&[short_version(), full_version()])
.inc();
let cli_options = CliOptions::new(&args);
// Log version and argument flags.
info!(
"short_version: {}, full_version: {}",
short_version(),
full_version()
let opts = subcmd.load_options(&cli_options)?;
let _guard = common_telemetry::init_global_logging(
&app_name,
opts.logging_options(),
cli_options.tracing_options(),
opts.node_id(),
);
log_env_flags();
let mut app = cmd.build(opts).await?;
log_versions();
tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Fatal error occurs!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Fatal error occurs!");
}
info!("Goodbye!");
}
}
let app = subcmd.build(opts).await?;
Ok(())
start_app(app).await
}

View File

@@ -13,9 +13,15 @@
// limitations under the License.
mod bench;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod repl;
// TODO(weny): Removes it
#[allow(deprecated)]
@@ -30,27 +36,35 @@ use upgrade::UpgradeCommand;
use self::export::ExportCommand;
use crate::error::Result;
use crate::options::{Options, TopLevelOptions};
use crate::options::{CliOptions, Options};
use crate::App;
#[async_trait]
pub trait Tool {
pub trait Tool: Send + Sync {
async fn do_work(&self) -> Result<()>;
}
pub enum Instance {
Repl(Repl),
Tool(Box<dyn Tool>),
pub struct Instance {
tool: Box<dyn Tool>,
}
impl Instance {
pub async fn start(&mut self) -> Result<()> {
match self {
Instance::Repl(repl) => repl.run().await,
Instance::Tool(tool) => tool.do_work().await,
}
fn new(tool: Box<dyn Tool>) -> Self {
Self { tool }
}
}
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-cli"
}
pub async fn stop(&self) -> Result<()> {
async fn start(&mut self) -> Result<()> {
self.tool.do_work().await
}
async fn stop(&self) -> Result<()> {
Ok(())
}
}
@@ -66,14 +80,15 @@ impl Command {
self.cmd.build().await
}
pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
pub fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut logging_opts = LoggingOptions::default();
if let Some(dir) = top_level_opts.log_dir {
logging_opts.dir = dir;
}
if top_level_opts.log_level.is_some() {
logging_opts.level = top_level_opts.log_level;
if let Some(dir) = &cli_options.log_dir {
logging_opts.dir = dir.clone();
}
logging_opts.level = cli_options.log_level.clone();
Ok(Options::Cli(Box::new(logging_opts)))
}
}
@@ -110,7 +125,6 @@ pub(crate) struct AttachCommand {
impl AttachCommand {
#[allow(dead_code)]
async fn build(self) -> Result<Instance> {
let repl = Repl::try_new(&self).await?;
Ok(Instance::Repl(repl))
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}

View File

@@ -69,7 +69,7 @@ impl BenchTableMetadataCommand {
table_metadata_manager,
count: self.count,
};
Ok(Instance::Tool(Box::new(tool)))
Ok(Instance::new(Box::new(tool)))
}
}

View File

@@ -105,7 +105,7 @@ impl ExportCommand {
}));
}
Ok(Instance::Tool(Box::new(Export {
Ok(Instance::new(Box::new(Export {
client: database_client,
catalog,
schema,

View File

@@ -76,7 +76,7 @@ impl UpgradeCommand {
skip_schema_keys: self.skip_schema_keys,
skip_table_route_keys: self.skip_table_route_keys,
};
Ok(Instance::Tool(Box::new(tool)))
Ok(Instance::new(Box::new(tool)))
}
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::logging;
@@ -25,14 +26,26 @@ use servers::Mode;
use snafu::{OptionExt, ResultExt};
use crate::error::{MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu};
use crate::options::{Options, TopLevelOptions};
use crate::options::{CliOptions, Options};
use crate::App;
pub struct Instance {
datanode: Datanode,
}
impl Instance {
pub async fn start(&mut self) -> Result<()> {
fn new(datanode: Datanode) -> Self {
Self { datanode }
}
}
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-datanode"
}
async fn start(&mut self) -> Result<()> {
plugins::start_datanode_plugins(self.datanode.plugins())
.await
.context(StartDatanodeSnafu)?;
@@ -40,7 +53,7 @@ impl Instance {
self.datanode.start().await.context(StartDatanodeSnafu)
}
pub async fn stop(&self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.datanode
.shutdown()
.await
@@ -59,8 +72,8 @@ impl Command {
self.subcmd.build(opts).await
}
pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
self.subcmd.load_options(top_level_opts)
pub fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
self.subcmd.load_options(cli_options)
}
}
@@ -76,9 +89,9 @@ impl SubCommand {
}
}
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
match self {
SubCommand::Start(cmd) => cmd.load_options(top_level_opts),
SubCommand::Start(cmd) => cmd.load_options(cli_options),
}
}
}
@@ -108,19 +121,19 @@ struct StartCommand {
}
impl StartCommand {
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut opts: DatanodeOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
DatanodeOptions::env_list_keys(),
)?;
if let Some(dir) = top_level_opts.log_dir {
opts.logging.dir = dir;
if let Some(dir) = &cli_options.log_dir {
opts.logging.dir = dir.clone();
}
if top_level_opts.log_level.is_some() {
opts.logging.level = top_level_opts.log_level;
if cli_options.log_level.is_some() {
opts.logging.level = cli_options.log_level.clone();
}
if let Some(addr) = &self.rpc_addr {
@@ -204,7 +217,7 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;
Ok(Instance { datanode })
Ok(Instance::new(datanode))
}
}
@@ -219,7 +232,7 @@ mod tests {
use servers::Mode;
use super::*;
use crate::options::ENV_VAR_SEP;
use crate::options::{CliOptions, ENV_VAR_SEP};
#[test]
fn test_read_from_config_file() {
@@ -274,8 +287,7 @@ mod tests {
..Default::default()
};
let Options::Datanode(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
let Options::Datanode(options) = cmd.load_options(&CliOptions::default()).unwrap() else {
unreachable!()
};
@@ -331,7 +343,7 @@ mod tests {
#[test]
fn test_try_from_cmd() {
if let Options::Datanode(opt) = StartCommand::default()
.load_options(TopLevelOptions::default())
.load_options(&CliOptions::default())
.unwrap()
{
assert_eq!(Mode::Standalone, opt.mode)
@@ -342,7 +354,7 @@ mod tests {
metasrv_addr: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(TopLevelOptions::default())
.load_options(&CliOptions::default())
.unwrap()
{
assert_eq!(Mode::Distributed, opt.mode)
@@ -352,7 +364,7 @@ mod tests {
metasrv_addr: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(TopLevelOptions::default())
.load_options(&CliOptions::default())
.is_err());
// Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
@@ -360,18 +372,21 @@ mod tests {
node_id: Some(42),
..Default::default()
})
.load_options(TopLevelOptions::default())
.load_options(&CliOptions::default())
.is_ok());
}
#[test]
fn test_top_level_options() {
fn test_load_log_options_from_cli() {
let cmd = StartCommand::default();
let options = cmd
.load_options(TopLevelOptions {
.load_options(&CliOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
@@ -454,8 +469,7 @@ mod tests {
..Default::default()
};
let Options::Datanode(opts) =
command.load_options(TopLevelOptions::default()).unwrap()
let Options::Datanode(opts) = command.load_options(&CliOptions::default()).unwrap()
else {
unreachable!()
};

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::CachedMetaKvBackend;
use clap::Parser;
use client::client_manager::DatanodeClients;
@@ -32,14 +33,26 @@ use servers::Mode;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, MissingConfigSnafu, Result, StartFrontendSnafu};
use crate::options::{Options, TopLevelOptions};
use crate::options::{CliOptions, Options};
use crate::App;
pub struct Instance {
frontend: FeInstance,
}
impl Instance {
pub async fn start(&mut self) -> Result<()> {
fn new(frontend: FeInstance) -> Self {
Self { frontend }
}
}
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-frontend"
}
async fn start(&mut self) -> Result<()> {
plugins::start_frontend_plugins(self.frontend.plugins().clone())
.await
.context(StartFrontendSnafu)?;
@@ -47,7 +60,7 @@ impl Instance {
self.frontend.start().await.context(StartFrontendSnafu)
}
pub async fn stop(&self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.frontend
.shutdown()
.await
@@ -66,8 +79,8 @@ impl Command {
self.subcmd.build(opts).await
}
pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
self.subcmd.load_options(top_level_opts)
pub fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
self.subcmd.load_options(cli_options)
}
}
@@ -83,9 +96,9 @@ impl SubCommand {
}
}
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
match self {
SubCommand::Start(cmd) => cmd.load_options(top_level_opts),
SubCommand::Start(cmd) => cmd.load_options(cli_options),
}
}
}
@@ -125,19 +138,19 @@ pub struct StartCommand {
}
impl StartCommand {
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut opts: FrontendOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
FrontendOptions::env_list_keys(),
)?;
if let Some(dir) = top_level_opts.log_dir {
opts.logging.dir = dir;
if let Some(dir) = &cli_options.log_dir {
opts.logging.dir = dir.clone();
}
if top_level_opts.log_level.is_some() {
opts.logging.level = top_level_opts.log_level;
if cli_options.log_level.is_some() {
opts.logging.level = cli_options.log_level.clone();
}
let tls_opts = TlsOption::new(
@@ -241,7 +254,7 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
Ok(Instance { frontend: instance })
Ok(Instance::new(instance))
}
}
@@ -257,7 +270,7 @@ mod tests {
use servers::http::HttpOptions;
use super::*;
use crate::options::ENV_VAR_SEP;
use crate::options::{CliOptions, ENV_VAR_SEP};
#[test]
fn test_try_from_start_command() {
@@ -271,8 +284,7 @@ mod tests {
..Default::default()
};
let Options::Frontend(opts) = command.load_options(TopLevelOptions::default()).unwrap()
else {
let Options::Frontend(opts) = command.load_options(&CliOptions::default()).unwrap() else {
unreachable!()
};
@@ -324,7 +336,7 @@ mod tests {
..Default::default()
};
let Options::Frontend(fe_opts) = command.load_options(TopLevelOptions::default()).unwrap()
let Options::Frontend(fe_opts) = command.load_options(&CliOptions::default()).unwrap()
else {
unreachable!()
};
@@ -363,16 +375,19 @@ mod tests {
}
#[test]
fn test_top_level_options() {
fn test_load_log_options_from_cli() {
let cmd = StartCommand {
disable_dashboard: Some(false),
..Default::default()
};
let options = cmd
.load_options(TopLevelOptions {
.load_options(&CliOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
@@ -452,11 +467,8 @@ mod tests {
..Default::default()
};
let top_level_opts = TopLevelOptions {
log_dir: None,
log_level: Some("error".to_string()),
};
let Options::Frontend(fe_opts) = command.load_options(top_level_opts).unwrap()
let Options::Frontend(fe_opts) =
command.load_options(&CliOptions::default()).unwrap()
else {
unreachable!()
};

View File

@@ -14,6 +14,10 @@
#![feature(assert_matches)]
use async_trait::async_trait;
use clap::arg;
use common_telemetry::{error, info};
pub mod cli;
pub mod datanode;
pub mod error;
@@ -21,3 +25,100 @@ pub mod frontend;
pub mod metasrv;
pub mod options;
pub mod standalone;
lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("app_version", "app version", &["short_version", "version"]).unwrap();
}
#[async_trait]
pub trait App {
fn name(&self) -> &str;
async fn start(&mut self) -> error::Result<()>;
async fn stop(&self) -> error::Result<()>;
}
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
let name = app.name().to_string();
tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Failed to start app {name}!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Failed to stop app {name}!");
}
info!("Goodbye!");
}
}
Ok(())
}
pub fn log_versions() {
// Report app version as gauge.
APP_VERSION
.with_label_values(&[short_version(), full_version()])
.inc();
// Log version and argument flags.
info!(
"short_version: {}, full_version: {}",
short_version(),
full_version()
);
log_env_flags();
}
pub fn greptimedb_cli() -> clap::Command {
let cmd = clap::Command::new("greptimedb")
.version(print_version())
.subcommand_required(true);
#[cfg(feature = "tokio-console")]
let cmd = cmd.arg(arg!(--"tokio-console-addr"[TOKIO_CONSOLE_ADDR]));
cmd.args([arg!(--"log-dir"[LOG_DIR]), arg!(--"log-level"[LOG_LEVEL])])
}
fn print_version() -> &'static str {
concat!(
"\nbranch: ",
env!("GIT_BRANCH"),
"\ncommit: ",
env!("GIT_COMMIT"),
"\ndirty: ",
env!("GIT_DIRTY"),
"\nversion: ",
env!("CARGO_PKG_VERSION")
)
}
fn short_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
// {app_name}-{branch_name}-{commit_short}
// The branch name (tag) of a release build should already contain the short
// version so the full version doesn't concat the short version explicitly.
fn full_version() -> &'static str {
concat!(
"greptimedb-",
env!("GIT_BRANCH"),
"-",
env!("GIT_COMMIT_SHORT")
)
}
fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {
info!("argument: {}", argument);
}
}

View File

@@ -14,6 +14,7 @@
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_telemetry::logging;
use meta_srv::bootstrap::MetaSrvInstance;
@@ -21,21 +22,34 @@ use meta_srv::metasrv::MetaSrvOptions;
use snafu::ResultExt;
use crate::error::{self, Result, StartMetaServerSnafu};
use crate::options::{Options, TopLevelOptions};
use crate::options::{CliOptions, Options};
use crate::App;
pub struct Instance {
instance: MetaSrvInstance,
}
impl Instance {
pub async fn start(&mut self) -> Result<()> {
fn new(instance: MetaSrvInstance) -> Self {
Self { instance }
}
}
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-metasrv"
}
async fn start(&mut self) -> Result<()> {
plugins::start_meta_srv_plugins(self.instance.plugins())
.await
.context(StartMetaServerSnafu)?;
self.instance.start().await.context(StartMetaServerSnafu)
}
pub async fn stop(&self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.instance
.shutdown()
.await
@@ -54,8 +68,8 @@ impl Command {
self.subcmd.build(opts).await
}
pub fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
self.subcmd.load_options(top_level_opts)
pub fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
self.subcmd.load_options(cli_options)
}
}
@@ -71,9 +85,9 @@ impl SubCommand {
}
}
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
match self {
SubCommand::Start(cmd) => cmd.load_options(top_level_opts),
SubCommand::Start(cmd) => cmd.load_options(cli_options),
}
}
}
@@ -106,19 +120,19 @@ struct StartCommand {
}
impl StartCommand {
fn load_options(&self, top_level_opts: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut opts: MetaSrvOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
None,
)?;
if let Some(dir) = top_level_opts.log_dir {
opts.logging.dir = dir;
if let Some(dir) = &cli_options.log_dir {
opts.logging.dir = dir.clone();
}
if top_level_opts.log_level.is_some() {
opts.logging.level = top_level_opts.log_level;
if cli_options.log_level.is_some() {
opts.logging.level = cli_options.log_level.clone();
}
if let Some(addr) = &self.bind_addr {
@@ -182,7 +196,7 @@ impl StartCommand {
.await
.context(error::BuildMetaServerSnafu)?;
Ok(Instance { instance })
Ok(Instance::new(instance))
}
}
@@ -206,8 +220,7 @@ mod tests {
..Default::default()
};
let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
let Options::Metasrv(options) = cmd.load_options(&CliOptions::default()).unwrap() else {
unreachable!()
};
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
@@ -242,8 +255,7 @@ mod tests {
..Default::default()
};
let Options::Metasrv(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
let Options::Metasrv(options) = cmd.load_options(&CliOptions::default()).unwrap() else {
unreachable!()
};
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
@@ -274,7 +286,7 @@ mod tests {
}
#[test]
fn test_top_level_options() {
fn test_load_log_options_from_cli() {
let cmd = StartCommand {
bind_addr: Some("127.0.0.1:3002".to_string()),
server_addr: Some("127.0.0.1:3002".to_string()),
@@ -284,9 +296,12 @@ mod tests {
};
let options = cmd
.load_options(TopLevelOptions {
.load_options(&CliOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
@@ -345,8 +360,7 @@ mod tests {
..Default::default()
};
let Options::Metasrv(opts) =
command.load_options(TopLevelOptions::default()).unwrap()
let Options::Metasrv(opts) = command.load_options(&CliOptions::default()).unwrap()
else {
unreachable!()
};

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
use frontend::error::{Result as FeResult, TomlFormatSnafu};
@@ -28,7 +29,7 @@ pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
/// Options mixed up from datanode, frontend and metasrv.
#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone)]
pub struct MixOptions {
pub data_home: String,
pub procedure: ProcedureConfig,
@@ -58,10 +59,32 @@ pub enum Options {
Cli(Box<LoggingOptions>),
}
#[derive(Clone, Debug, Default)]
pub struct TopLevelOptions {
#[derive(Default)]
pub struct CliOptions {
pub log_dir: Option<String>,
pub log_level: Option<String>,
#[cfg(feature = "tokio-console")]
pub tokio_console_addr: Option<String>,
}
impl CliOptions {
pub fn new(args: &ArgMatches) -> Self {
Self {
log_dir: args.get_one::<String>("log-dir").cloned(),
log_level: args.get_one::<String>("log-level").cloned(),
#[cfg(feature = "tokio-console")]
tokio_console_addr: args.get_one::<String>("tokio-console-addr").cloned(),
}
}
pub fn tracing_options(&self) -> TracingOptions {
TracingOptions {
#[cfg(feature = "tokio-console")]
tokio_console_addr: self.tokio_console_addr.clone(),
}
}
}
impl Options {

View File

@@ -15,11 +15,12 @@
use std::sync::Arc;
use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -49,7 +50,8 @@ use crate::error::{
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};
use crate::options::{CliOptions, MixOptions, Options};
use crate::App;
#[derive(Parser)]
pub struct Command {
@@ -62,8 +64,8 @@ impl Command {
self.subcmd.build(opts).await
}
pub fn load_options(&self, top_level_options: TopLevelOptions) -> Result<Options> {
self.subcmd.load_options(top_level_options)
pub fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
self.subcmd.load_options(cli_options)
}
}
@@ -79,9 +81,9 @@ impl SubCommand {
}
}
fn load_options(&self, top_level_options: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
match self {
SubCommand::Start(cmd) => cmd.load_options(top_level_options),
SubCommand::Start(cmd) => cmd.load_options(cli_options),
}
}
}
@@ -171,8 +173,13 @@ pub struct Instance {
procedure_manager: ProcedureManagerRef,
}
impl Instance {
pub async fn start(&mut self) -> Result<()> {
#[async_trait]
impl App for Instance {
fn name(&self) -> &str {
"greptime-standalone"
}
async fn start(&mut self) -> Result<()> {
self.datanode.start_telemetry();
self.procedure_manager
@@ -184,7 +191,7 @@ impl Instance {
Ok(())
}
pub async fn stop(&self) -> Result<()> {
async fn stop(&self) -> Result<()> {
self.frontend
.shutdown()
.await
@@ -206,7 +213,7 @@ impl Instance {
}
#[derive(Debug, Default, Parser)]
struct StartCommand {
pub struct StartCommand {
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
@@ -220,7 +227,7 @@ struct StartCommand {
#[clap(short, long)]
influxdb_enable: bool,
#[clap(short, long)]
config_file: Option<String>,
pub config_file: Option<String>,
#[clap(long)]
tls_mode: Option<TlsMode>,
#[clap(long)]
@@ -230,14 +237,14 @@ struct StartCommand {
#[clap(long)]
user_provider: Option<String>,
#[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
env_prefix: String,
pub env_prefix: String,
/// The working home directory of this standalone instance.
#[clap(long)]
data_home: Option<String>,
}
impl StartCommand {
fn load_options(&self, top_level_options: TopLevelOptions) -> Result<Options> {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut opts: StandaloneOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
@@ -246,12 +253,12 @@ impl StartCommand {
opts.mode = Mode::Standalone;
if let Some(dir) = top_level_options.log_dir {
opts.logging.dir = dir;
if let Some(dir) = &cli_options.log_dir {
opts.logging.dir = dir.clone();
}
if top_level_options.log_level.is_some() {
opts.logging.level = top_level_options.log_level;
if cli_options.log_level.is_some() {
opts.logging.level = cli_options.log_level.clone();
}
let tls_opts = TlsOption::new(
@@ -357,10 +364,14 @@ impl StartCommand {
let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
let table_meta_allocator =
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone()));
let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
procedure_manager.clone(),
datanode_manager.clone(),
table_meta_allocator,
)
.await?;
@@ -382,10 +393,11 @@ impl StartCommand {
})
}
async fn create_ddl_task_executor(
pub async fn create_ddl_task_executor(
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<DdlTaskExecutorRef> {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
@@ -396,7 +408,7 @@ impl StartCommand {
datanode_manager,
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(StandaloneTableMetadataCreator::new(kv_backend)),
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
)
.context(InitDdlManagerSnafu)?,
@@ -432,7 +444,7 @@ mod tests {
use servers::Mode;
use super::*;
use crate::options::ENV_VAR_SEP;
use crate::options::{CliOptions, ENV_VAR_SEP};
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
@@ -515,8 +527,7 @@ mod tests {
..Default::default()
};
let Options::Standalone(options) = cmd.load_options(TopLevelOptions::default()).unwrap()
else {
let Options::Standalone(options) = cmd.load_options(&CliOptions::default()).unwrap() else {
unreachable!()
};
let fe_opts = options.frontend;
@@ -561,16 +572,19 @@ mod tests {
}
#[test]
fn test_top_level_options() {
fn test_load_log_options_from_cli() {
let cmd = StartCommand {
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
..Default::default()
};
let Options::Standalone(opts) = cmd
.load_options(TopLevelOptions {
.load_options(&CliOptions {
log_dir: Some("/tmp/greptimedb/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap()
else {
@@ -637,11 +651,8 @@ mod tests {
..Default::default()
};
let top_level_opts = TopLevelOptions {
log_dir: None,
log_level: None,
};
let Options::Standalone(opts) = command.load_options(top_level_opts).unwrap()
let Options::Standalone(opts) =
command.load_options(&CliOptions::default()).unwrap()
else {
unreachable!()
};

View File

@@ -399,9 +399,20 @@ impl RegionServerInner {
}
async fn stop(&self) -> Result<()> {
for region in self.region_map.iter() {
let region_id = *region.key();
let engine = region.value();
// Calling async functions while iterating inside the Dashmap could easily cause the Rust
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
//
// The walkaround is to put the async functions in the `common_runtime::spawn_bg`. Or like
// it here, collect the values first then use later separately.
let regions = self
.region_map
.iter()
.map(|x| (*x.key(), x.value().clone()))
.collect::<Vec<_>>();
for (region_id, engine) in regions {
let closed = engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await;