refactor: make the cmd hold the application instance (#1159)

This commit is contained in:
Weny Xu
2023-03-14 15:18:50 +08:00
committed by GitHub
parent 81ca1d8399
commit cdc111b607
9 changed files with 244 additions and 72 deletions

View File

@@ -30,9 +30,39 @@ struct Command {
subcmd: SubCommand, subcmd: SubCommand,
} }
pub enum Application {
Datanode(datanode::Instance),
Frontend(frontend::Instance),
Metasrv(metasrv::Instance),
Standalone(standalone::Instance),
Cli(cli::Instance),
}
impl Application {
async fn run(&mut self) -> Result<()> {
match self {
Application::Datanode(instance) => instance.run().await,
Application::Frontend(instance) => instance.run().await,
Application::Metasrv(instance) => instance.run().await,
Application::Standalone(instance) => instance.run().await,
Application::Cli(instance) => instance.run().await,
}
}
async fn stop(&self) -> Result<()> {
match self {
Application::Datanode(instance) => instance.stop().await,
Application::Frontend(instance) => instance.stop().await,
Application::Metasrv(instance) => instance.stop().await,
Application::Standalone(instance) => instance.stop().await,
Application::Cli(instance) => instance.stop().await,
}
}
}
impl Command { impl Command {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Application> {
self.subcmd.run().await self.subcmd.build().await
} }
} }
@@ -51,13 +81,28 @@ enum SubCommand {
} }
impl SubCommand { impl SubCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Application> {
match self { match self {
SubCommand::Datanode(cmd) => cmd.run().await, SubCommand::Datanode(cmd) => {
SubCommand::Frontend(cmd) => cmd.run().await, let app = cmd.build().await?;
SubCommand::Metasrv(cmd) => cmd.run().await, Ok(Application::Datanode(app))
SubCommand::Standalone(cmd) => cmd.run().await, }
SubCommand::Cli(cmd) => cmd.run().await, SubCommand::Frontend(cmd) => {
let app = cmd.build().await?;
Ok(Application::Frontend(app))
}
SubCommand::Metasrv(cmd) => {
let app = cmd.build().await?;
Ok(Application::Metasrv(app))
}
SubCommand::Standalone(cmd) => {
let app = cmd.build().await?;
Ok(Application::Standalone(app))
}
SubCommand::Cli(cmd) => {
let app = cmd.build().await?;
Ok(Application::Cli(app))
}
} }
} }
} }
@@ -104,13 +149,18 @@ async fn main() -> Result<()> {
common_telemetry::init_default_metrics_recorder(); common_telemetry::init_default_metrics_recorder();
let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false); let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false);
let mut app = cmd.build().await?;
tokio::select! { tokio::select! {
result = cmd.run() => { result = app.run() => {
if let Err(err) = result { if let Err(err) = result {
error!(err; "Fatal error occurs!"); error!(err; "Fatal error occurs!");
} }
} }
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Fatal error occurs!");
}
info!("Goodbye!"); info!("Goodbye!");
} }
} }

View File

@@ -17,10 +17,25 @@ mod helper;
mod repl; mod repl;
use clap::Parser; use clap::Parser;
use repl::Repl; pub use repl::Repl;
use crate::error::Result; use crate::error::Result;
pub struct Instance {
repl: Repl,
}
impl Instance {
pub async fn run(&mut self) -> Result<()> {
self.repl.run().await
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle cli shutdown
Ok(())
}
}
#[derive(Parser)] #[derive(Parser)]
pub struct Command { pub struct Command {
#[clap(subcommand)] #[clap(subcommand)]
@@ -28,8 +43,8 @@ pub struct Command {
} }
impl Command { impl Command {
pub async fn run(self) -> Result<()> { pub async fn build(self) -> Result<Instance> {
self.cmd.run().await self.cmd.build().await
} }
} }
@@ -39,9 +54,9 @@ enum SubCommand {
} }
impl SubCommand { impl SubCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
match self { match self {
SubCommand::Attach(cmd) => cmd.run().await, SubCommand::Attach(cmd) => cmd.build().await,
} }
} }
} }
@@ -57,8 +72,8 @@ pub(crate) struct AttachCommand {
} }
impl AttachCommand { impl AttachCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
let mut repl = Repl::try_new(&self).await?; let repl = Repl::try_new(&self).await?;
repl.run().await Ok(Instance { repl })
} }
} }

View File

@@ -50,7 +50,7 @@ use crate::error::{
}; };
/// Captures the state of the repl, gathers commands and executes them one by one /// Captures the state of the repl, gathers commands and executes them one by one
pub(crate) struct Repl { pub struct Repl {
/// Rustyline editor for interacting with user on command line /// Rustyline editor for interacting with user on command line
rl: Editor<RustylineHelper>, rl: Editor<RustylineHelper>,

View File

@@ -24,6 +24,21 @@ use snafu::ResultExt;
use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu}; use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::toml_loader; use crate::toml_loader;
pub struct Instance {
datanode: Datanode,
}
impl Instance {
pub async fn run(&mut self) -> Result<()> {
self.datanode.start().await.context(StartDatanodeSnafu)
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle datanode shutdown
Ok(())
}
}
#[derive(Parser)] #[derive(Parser)]
pub struct Command { pub struct Command {
#[clap(subcommand)] #[clap(subcommand)]
@@ -31,8 +46,8 @@ pub struct Command {
} }
impl Command { impl Command {
pub async fn run(self) -> Result<()> { pub async fn build(self) -> Result<Instance> {
self.subcmd.run().await self.subcmd.build().await
} }
} }
@@ -42,9 +57,9 @@ enum SubCommand {
} }
impl SubCommand { impl SubCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
match self { match self {
SubCommand::Start(cmd) => cmd.run().await, SubCommand::Start(cmd) => cmd.build().await,
} }
} }
} }
@@ -72,19 +87,16 @@ struct StartCommand {
} }
impl StartCommand { impl StartCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
logging::info!("Datanode start command: {:#?}", self); logging::info!("Datanode start command: {:#?}", self);
let opts: DatanodeOptions = self.try_into()?; let opts: DatanodeOptions = self.try_into()?;
logging::info!("Datanode options: {:#?}", opts); logging::info!("Datanode options: {:#?}", opts);
Datanode::new(opts) let datanode = Datanode::new(opts).await.context(StartDatanodeSnafu)?;
.await
.context(StartDatanodeSnafu)? Ok(Instance { datanode })
.start()
.await
.context(StartDatanodeSnafu)
} }
} }

View File

@@ -32,6 +32,12 @@ pub enum Error {
source: frontend::error::Error, source: frontend::error::Error,
}, },
#[snafu(display("Failed to build meta server, source: {}", source))]
BuildMetaServer {
#[snafu(backtrace)]
source: meta_srv::error::Error,
},
#[snafu(display("Failed to start meta server, source: {}", source))] #[snafu(display("Failed to start meta server, source: {}", source))]
StartMetaServer { StartMetaServer {
#[snafu(backtrace)] #[snafu(backtrace)]
@@ -138,6 +144,7 @@ impl ErrorExt for Error {
Error::StartDatanode { source } => source.status_code(), Error::StartDatanode { source } => source.status_code(),
Error::StartFrontend { source } => source.status_code(), Error::StartFrontend { source } => source.status_code(),
Error::StartMetaServer { source } => source.status_code(), Error::StartMetaServer { source } => source.status_code(),
Error::BuildMetaServer { source } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => { Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => {
StatusCode::InvalidArguments StatusCode::InvalidArguments

View File

@@ -19,7 +19,7 @@ use common_base::Plugins;
use frontend::frontend::FrontendOptions; use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions; use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions; use frontend::influxdb::InfluxdbOptions;
use frontend::instance::{FrontendInstance, Instance}; use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::mysql::MysqlOptions; use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions; use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions; use frontend::postgres::PostgresOptions;
@@ -34,6 +34,24 @@ use snafu::ResultExt;
use crate::error::{self, IllegalAuthConfigSnafu, Result}; use crate::error::{self, IllegalAuthConfigSnafu, Result};
use crate::toml_loader; use crate::toml_loader;
pub struct Instance {
frontend: FeInstance,
}
impl Instance {
pub async fn run(&mut self) -> Result<()> {
self.frontend
.start()
.await
.context(error::StartFrontendSnafu)
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle frontend shutdown
Ok(())
}
}
#[derive(Parser)] #[derive(Parser)]
pub struct Command { pub struct Command {
#[clap(subcommand)] #[clap(subcommand)]
@@ -41,8 +59,8 @@ pub struct Command {
} }
impl Command { impl Command {
pub async fn run(self) -> Result<()> { pub async fn build(self) -> Result<Instance> {
self.subcmd.run().await self.subcmd.build().await
} }
} }
@@ -52,9 +70,9 @@ enum SubCommand {
} }
impl SubCommand { impl SubCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
match self { match self {
SubCommand::Start(cmd) => cmd.run().await, SubCommand::Start(cmd) => cmd.build().await,
} }
} }
} }
@@ -90,11 +108,11 @@ pub struct StartCommand {
} }
impl StartCommand { impl StartCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let opts: FrontendOptions = self.try_into()?; let opts: FrontendOptions = self.try_into()?;
let mut instance = Instance::try_new_distributed(&opts, plugins.clone()) let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone())
.await .await
.context(error::StartFrontendSnafu)?; .context(error::StartFrontendSnafu)?;
@@ -103,7 +121,7 @@ impl StartCommand {
.await .await
.context(error::StartFrontendSnafu)?; .context(error::StartFrontendSnafu)?;
instance.start().await.context(error::StartFrontendSnafu) Ok(Instance { frontend: instance })
} }
} }

View File

@@ -14,13 +14,32 @@
use clap::Parser; use clap::Parser;
use common_telemetry::{info, logging, warn}; use common_telemetry::{info, logging, warn};
use meta_srv::bootstrap; use meta_srv::bootstrap::MetaSrvInstance;
use meta_srv::metasrv::MetaSrvOptions; use meta_srv::metasrv::MetaSrvOptions;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::{error, toml_loader}; use crate::{error, toml_loader};
pub struct Instance {
instance: MetaSrvInstance,
}
impl Instance {
pub async fn run(&mut self) -> Result<()> {
self.instance
.start()
.await
.context(error::StartMetaServerSnafu)?;
Ok(())
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle metasrv shutdown
Ok(())
}
}
#[derive(Parser)] #[derive(Parser)]
pub struct Command { pub struct Command {
#[clap(subcommand)] #[clap(subcommand)]
@@ -28,8 +47,8 @@ pub struct Command {
} }
impl Command { impl Command {
pub async fn run(self) -> Result<()> { pub async fn build(self) -> Result<Instance> {
self.subcmd.run().await self.subcmd.build().await
} }
} }
@@ -39,9 +58,9 @@ enum SubCommand {
} }
impl SubCommand { impl SubCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
match self { match self {
SubCommand::Start(cmd) => cmd.run().await, SubCommand::Start(cmd) => cmd.build().await,
} }
} }
} }
@@ -63,16 +82,17 @@ struct StartCommand {
} }
impl StartCommand { impl StartCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
logging::info!("MetaSrv start command: {:#?}", self); logging::info!("MetaSrv start command: {:#?}", self);
let opts: MetaSrvOptions = self.try_into()?; let opts: MetaSrvOptions = self.try_into()?;
logging::info!("MetaSrv options: {:#?}", opts); logging::info!("MetaSrv options: {:#?}", opts);
let instance = MetaSrvInstance::new(opts)
bootstrap::bootstrap_meta_srv(opts)
.await .await
.context(error::StartMetaServerSnafu) .context(error::BuildMetaServerSnafu)?;
Ok(Instance { instance })
} }
} }

View File

@@ -47,8 +47,8 @@ pub struct Command {
} }
impl Command { impl Command {
pub async fn run(self) -> Result<()> { pub async fn build(self) -> Result<Instance> {
self.subcmd.run().await self.subcmd.build().await
} }
} }
@@ -58,9 +58,9 @@ enum SubCommand {
} }
impl SubCommand { impl SubCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
match self { match self {
SubCommand::Start(cmd) => cmd.run().await, SubCommand::Start(cmd) => cmd.build().await,
} }
} }
} }
@@ -133,6 +133,30 @@ impl StandaloneOptions {
} }
} }
pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
}
impl Instance {
pub async fn run(&mut self) -> Result<()> {
// Start datanode instance before starting services, to avoid requests come in before internal components are started.
self.datanode
.start_instance()
.await
.context(StartDatanodeSnafu)?;
info!("Datanode instance started");
self.frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle standalone shutdown
Ok(())
}
}
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
struct StartCommand { struct StartCommand {
#[clap(long)] #[clap(long)]
@@ -164,7 +188,7 @@ struct StartCommand {
} }
impl StartCommand { impl StartCommand {
async fn run(self) -> Result<()> { async fn build(self) -> Result<Instance> {
let enable_memory_catalog = self.enable_memory_catalog; let enable_memory_catalog = self.enable_memory_catalog;
let config_file = self.config_file.clone(); let config_file = self.config_file.clone();
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
@@ -184,25 +208,18 @@ impl StartCommand {
fe_opts, dn_opts fe_opts, dn_opts
); );
let mut datanode = Datanode::new(dn_opts.clone()) let datanode = Datanode::new(dn_opts.clone())
.await .await
.context(StartDatanodeSnafu)?; .context(StartDatanodeSnafu)?;
let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?;
// Start datanode instance before starting services, to avoid requests come in before internal components are started. let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?;
datanode
.start_instance()
.await
.context(StartDatanodeSnafu)?;
info!("Datanode instance started");
frontend frontend
.build_servers(&fe_opts, plugins) .build_servers(&fe_opts, plugins)
.await .await
.context(StartFrontendSnafu)?; .context(StartFrontendSnafu)?;
frontend.start().await.context(StartFrontendSnafu)?; Ok(Instance { datanode, frontend })
Ok(())
} }
} }

View File

@@ -39,18 +39,45 @@ use crate::service::store::kv::ResettableKvStoreRef;
use crate::service::store::memory::MemStore; use crate::service::store::memory::MemStore;
use crate::{error, Result}; use crate::{error, Result};
// Bootstrap the rpc server to serve incoming request #[derive(Clone)]
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> { pub struct MetaSrvInstance {
let meta_srv = make_meta_srv(opts.clone()).await?; meta_srv: MetaSrv,
bootstrap_meta_srv_with_router(opts, router(meta_srv)).await
opts: MetaSrvOptions,
} }
pub async fn bootstrap_meta_srv_with_router(opts: MetaSrvOptions, router: Router) -> Result<()> { impl MetaSrvInstance {
let listener = TcpListener::bind(&opts.bind_addr) pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts).await?;
Ok(MetaSrvInstance { meta_srv, opts })
}
pub async fn start(&self) -> Result<()> {
self.meta_srv.start().await;
bootstrap_meta_srv_with_router(&self.opts.bind_addr, router(self.meta_srv.clone())).await?;
Ok(())
}
pub async fn close(&self) -> Result<()> {
// TODO: shutdown the router
self.meta_srv.shutdown();
Ok(())
}
}
// Bootstrap the rpc server to serve incoming request
pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> {
let meta_srv = make_meta_srv(&opts).await?;
bootstrap_meta_srv_with_router(&opts.bind_addr, router(meta_srv)).await
}
pub async fn bootstrap_meta_srv_with_router(bind_addr: &str, router: Router) -> Result<()> {
let listener = TcpListener::bind(bind_addr)
.await .await
.context(error::TcpBindSnafu { .context(error::TcpBindSnafu { addr: bind_addr })?;
addr: &opts.bind_addr,
})?;
let listener = TcpListenerStream::new(listener); let listener = TcpListenerStream::new(listener);
router router
@@ -72,7 +99,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(admin::make_admin_service(meta_srv)) .add_service(admin::make_admin_service(meta_srv))
} }
pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> { pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
let (kv_store, election, lock) = if opts.use_memory_store { let (kv_store, election, lock) = if opts.use_memory_store {
(Arc::new(MemStore::new()) as _, None, None) (Arc::new(MemStore::new()) as _, None, None)
} else { } else {
@@ -107,7 +134,7 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
}; };
let meta_srv = MetaSrvBuilder::new() let meta_srv = MetaSrvBuilder::new()
.options(opts) .options(opts.clone())
.kv_store(kv_store) .kv_store(kv_store)
.in_memory(in_memory) .in_memory(in_memory)
.selector(selector) .selector(selector)
@@ -117,6 +144,12 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
.build() .build()
.await; .await;
Ok(meta_srv)
}
pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
let meta_srv = build_meta_srv(opts).await?;
meta_srv.start().await; meta_srv.start().await;
Ok(meta_srv) Ok(meta_srv)