diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index b532c4d088..e16dc6a559 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -30,9 +30,39 @@ struct Command { subcmd: SubCommand, } +pub enum Application { + Datanode(datanode::Instance), + Frontend(frontend::Instance), + Metasrv(metasrv::Instance), + Standalone(standalone::Instance), + Cli(cli::Instance), +} + +impl Application { + async fn run(&mut self) -> Result<()> { + match self { + Application::Datanode(instance) => instance.run().await, + Application::Frontend(instance) => instance.run().await, + Application::Metasrv(instance) => instance.run().await, + Application::Standalone(instance) => instance.run().await, + Application::Cli(instance) => instance.run().await, + } + } + + async fn stop(&self) -> Result<()> { + match self { + Application::Datanode(instance) => instance.stop().await, + Application::Frontend(instance) => instance.stop().await, + Application::Metasrv(instance) => instance.stop().await, + Application::Standalone(instance) => instance.stop().await, + Application::Cli(instance) => instance.stop().await, + } + } +} + impl Command { - async fn run(self) -> Result<()> { - self.subcmd.run().await + async fn build(self) -> Result { + self.subcmd.build().await } } @@ -51,13 +81,28 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Datanode(cmd) => cmd.run().await, - SubCommand::Frontend(cmd) => cmd.run().await, - SubCommand::Metasrv(cmd) => cmd.run().await, - SubCommand::Standalone(cmd) => cmd.run().await, - SubCommand::Cli(cmd) => cmd.run().await, + SubCommand::Datanode(cmd) => { + let app = cmd.build().await?; + Ok(Application::Datanode(app)) + } + SubCommand::Frontend(cmd) => { + let app = cmd.build().await?; + Ok(Application::Frontend(app)) + } + SubCommand::Metasrv(cmd) => { + let app = cmd.build().await?; + Ok(Application::Metasrv(app)) + } + SubCommand::Standalone(cmd) => { + let app = cmd.build().await?; + Ok(Application::Standalone(app)) + } + SubCommand::Cli(cmd) => { + let app = cmd.build().await?; + Ok(Application::Cli(app)) + } } } } @@ -104,13 +149,18 @@ async fn main() -> Result<()> { common_telemetry::init_default_metrics_recorder(); let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false); + let mut app = cmd.build().await?; + tokio::select! { - result = cmd.run() => { + result = app.run() => { if let Err(err) = result { error!(err; "Fatal error occurs!"); } } _ = tokio::signal::ctrl_c() => { + if let Err(err) = app.stop().await { + error!(err; "Fatal error occurs!"); + } info!("Goodbye!"); } } diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index dbabaf8e60..0563dcc120 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -17,10 +17,25 @@ mod helper; mod repl; use clap::Parser; -use repl::Repl; +pub use repl::Repl; use crate::error::Result; +pub struct Instance { + repl: Repl, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.repl.run().await + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle cli shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -28,8 +43,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.cmd.run().await + pub async fn build(self) -> Result { + self.cmd.build().await } } @@ -39,9 +54,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Attach(cmd) => cmd.run().await, + SubCommand::Attach(cmd) => cmd.build().await, } } } @@ -57,8 +72,8 @@ pub(crate) struct AttachCommand { } impl AttachCommand { - async fn run(self) -> Result<()> { - let mut repl = Repl::try_new(&self).await?; - repl.run().await + async fn build(self) -> Result { + let repl = Repl::try_new(&self).await?; + Ok(Instance { repl }) } } diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 144f3ac410..aa77df8e22 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -50,7 +50,7 @@ use crate::error::{ }; /// Captures the state of the repl, gathers commands and executes them one by one -pub(crate) struct Repl { +pub struct Repl { /// Rustyline editor for interacting with user on command line rl: Editor, diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 405dc36f5d..1aa095e21b 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -24,6 +24,21 @@ use snafu::ResultExt; use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu}; use crate::toml_loader; +pub struct Instance { + datanode: Datanode, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.datanode.start().await.context(StartDatanodeSnafu) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle datanode shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -31,8 +46,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -42,9 +57,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -72,19 +87,16 @@ struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { logging::info!("Datanode start command: {:#?}", self); let opts: DatanodeOptions = self.try_into()?; logging::info!("Datanode options: {:#?}", opts); - Datanode::new(opts) - .await - .context(StartDatanodeSnafu)? - .start() - .await - .context(StartDatanodeSnafu) + let datanode = Datanode::new(opts).await.context(StartDatanodeSnafu)?; + + Ok(Instance { datanode }) } } diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 9a7c083c75..a4b42c7fda 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -32,6 +32,12 @@ pub enum Error { source: frontend::error::Error, }, + #[snafu(display("Failed to build meta server, source: {}", source))] + BuildMetaServer { + #[snafu(backtrace)] + source: meta_srv::error::Error, + }, + #[snafu(display("Failed to start meta server, source: {}", source))] StartMetaServer { #[snafu(backtrace)] @@ -138,6 +144,7 @@ impl ErrorExt for Error { Error::StartDatanode { source } => source.status_code(), Error::StartFrontend { source } => source.status_code(), Error::StartMetaServer { source } => source.status_code(), + Error::BuildMetaServer { source } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => { StatusCode::InvalidArguments diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index c943cba994..b98fdda97d 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -19,7 +19,7 @@ use common_base::Plugins; use frontend::frontend::FrontendOptions; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; -use frontend::instance::{FrontendInstance, Instance}; +use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; @@ -34,6 +34,24 @@ use snafu::ResultExt; use crate::error::{self, IllegalAuthConfigSnafu, Result}; use crate::toml_loader; +pub struct Instance { + frontend: FeInstance, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.frontend + .start() + .await + .context(error::StartFrontendSnafu) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle frontend shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -41,8 +59,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -52,9 +70,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -90,11 +108,11 @@ pub struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); let opts: FrontendOptions = self.try_into()?; - let mut instance = Instance::try_new_distributed(&opts, plugins.clone()) + let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone()) .await .context(error::StartFrontendSnafu)?; @@ -103,7 +121,7 @@ impl StartCommand { .await .context(error::StartFrontendSnafu)?; - instance.start().await.context(error::StartFrontendSnafu) + Ok(Instance { frontend: instance }) } } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 8151d3b3f4..ab46f6f09b 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -14,13 +14,32 @@ use clap::Parser; use common_telemetry::{info, logging, warn}; -use meta_srv::bootstrap; +use meta_srv::bootstrap::MetaSrvInstance; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; use crate::error::{Error, Result}; use crate::{error, toml_loader}; +pub struct Instance { + instance: MetaSrvInstance, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + self.instance + .start() + .await + .context(error::StartMetaServerSnafu)?; + Ok(()) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle metasrv shutdown + Ok(()) + } +} + #[derive(Parser)] pub struct Command { #[clap(subcommand)] @@ -28,8 +47,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -39,9 +58,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -63,16 +82,17 @@ struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { logging::info!("MetaSrv start command: {:#?}", self); let opts: MetaSrvOptions = self.try_into()?; logging::info!("MetaSrv options: {:#?}", opts); - - bootstrap::bootstrap_meta_srv(opts) + let instance = MetaSrvInstance::new(opts) .await - .context(error::StartMetaServerSnafu) + .context(error::BuildMetaServerSnafu)?; + + Ok(Instance { instance }) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d046028709..1d53ca7b0f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -47,8 +47,8 @@ pub struct Command { } impl Command { - pub async fn run(self) -> Result<()> { - self.subcmd.run().await + pub async fn build(self) -> Result { + self.subcmd.build().await } } @@ -58,9 +58,9 @@ enum SubCommand { } impl SubCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { match self { - SubCommand::Start(cmd) => cmd.run().await, + SubCommand::Start(cmd) => cmd.build().await, } } } @@ -133,6 +133,30 @@ impl StandaloneOptions { } } +pub struct Instance { + datanode: Datanode, + frontend: FeInstance, +} + +impl Instance { + pub async fn run(&mut self) -> Result<()> { + // Start datanode instance before starting services, to avoid requests come in before internal components are started. + self.datanode + .start_instance() + .await + .context(StartDatanodeSnafu)?; + info!("Datanode instance started"); + + self.frontend.start().await.context(StartFrontendSnafu)?; + Ok(()) + } + + pub async fn stop(&self) -> Result<()> { + // TODO: handle standalone shutdown + Ok(()) + } +} + #[derive(Debug, Parser)] struct StartCommand { #[clap(long)] @@ -164,7 +188,7 @@ struct StartCommand { } impl StartCommand { - async fn run(self) -> Result<()> { + async fn build(self) -> Result { let enable_memory_catalog = self.enable_memory_catalog; let config_file = self.config_file.clone(); let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); @@ -184,25 +208,18 @@ impl StartCommand { fe_opts, dn_opts ); - let mut datanode = Datanode::new(dn_opts.clone()) + let datanode = Datanode::new(dn_opts.clone()) .await .context(StartDatanodeSnafu)?; - let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?; - // Start datanode instance before starting services, to avoid requests come in before internal components are started. - datanode - .start_instance() - .await - .context(StartDatanodeSnafu)?; - info!("Datanode instance started"); + let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?; frontend .build_servers(&fe_opts, plugins) .await .context(StartFrontendSnafu)?; - frontend.start().await.context(StartFrontendSnafu)?; - Ok(()) + Ok(Instance { datanode, frontend }) } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 7749470019..c075b12ec9 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -39,18 +39,45 @@ use crate::service::store::kv::ResettableKvStoreRef; use crate::service::store::memory::MemStore; use crate::{error, Result}; -// Bootstrap the rpc server to serve incoming request -pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> { - let meta_srv = make_meta_srv(opts.clone()).await?; - bootstrap_meta_srv_with_router(opts, router(meta_srv)).await +#[derive(Clone)] +pub struct MetaSrvInstance { + meta_srv: MetaSrv, + + opts: MetaSrvOptions, } -pub async fn bootstrap_meta_srv_with_router(opts: MetaSrvOptions, router: Router) -> Result<()> { - let listener = TcpListener::bind(&opts.bind_addr) +impl MetaSrvInstance { + pub async fn new(opts: MetaSrvOptions) -> Result { + let meta_srv = build_meta_srv(&opts).await?; + + Ok(MetaSrvInstance { meta_srv, opts }) + } + + pub async fn start(&self) -> Result<()> { + self.meta_srv.start().await; + bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone())).await?; + + Ok(()) + } + + pub async fn close(&self) -> Result<()> { + // TODO: shutdown the router + self.meta_srv.shutdown(); + + Ok(()) + } +} + +// Bootstrap the rpc server to serve incoming request +pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> { + let meta_srv = make_meta_srv(&opts).await?; + bootstrap_meta_srv_with_router(&opts.bind_addr, router(meta_srv)).await +} + +pub async fn bootstrap_meta_srv_with_router(bind_addr: &str, router: Router) -> Result<()> { + let listener = TcpListener::bind(bind_addr) .await - .context(error::TcpBindSnafu { - addr: &opts.bind_addr, - })?; + .context(error::TcpBindSnafu { addr: bind_addr })?; let listener = TcpListenerStream::new(listener); router @@ -72,7 +99,7 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(admin::make_admin_service(meta_srv)) } -pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { +pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { let (kv_store, election, lock) = if opts.use_memory_store { (Arc::new(MemStore::new()) as _, None, None) } else { @@ -107,7 +134,7 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { }; let meta_srv = MetaSrvBuilder::new() - .options(opts) + .options(opts.clone()) .kv_store(kv_store) .in_memory(in_memory) .selector(selector) @@ -117,6 +144,12 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { .build() .await; + Ok(meta_srv) +} + +pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result { + let meta_srv = build_meta_srv(opts).await?; + meta_srv.start().await; Ok(meta_srv)