From 725a261b559ed1a2608e06a2f8e28b0d60f5474b Mon Sep 17 00:00:00 2001 From: fengjiachun Date: Wed, 15 Jun 2022 20:08:00 +0800 Subject: [PATCH] feat(cmd): command refactor (#44) * feat(cmd): command refactor --- Cargo.lock | 2 + README.md | 17 ++++++++ src/cmd/Cargo.toml | 4 +- src/cmd/src/bin/greptime.rs | 77 +++++++++++++++++++++++++-------- src/cmd/src/datanode.rs | 57 ++++++++++++++++++++++++ src/cmd/src/error.rs | 31 +++++++++++++ src/cmd/src/lib.rs | 3 +- src/cmd/src/opts.rs | 15 ------- src/datanode/src/datanode.rs | 17 +++++--- src/datanode/src/error.rs | 8 +++- src/datanode/src/lib.rs | 3 +- src/datanode/src/server.rs | 5 ++- src/datanode/src/server/http.rs | 15 +++---- 13 files changed, 200 insertions(+), 54 deletions(-) create mode 100644 src/cmd/src/datanode.rs create mode 100644 src/cmd/src/error.rs delete mode 100644 src/cmd/src/opts.rs diff --git a/Cargo.lock b/Cargo.lock index 9df08ba630..4fb3da8700 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,8 +527,10 @@ name = "cmd" version = "0.1.0" dependencies = [ "clap", + "common-error", "common-telemetry", "datanode", + "snafu", "tokio", ] diff --git a/README.md b/README.md index af3a6ae4c6..cc052e74df 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,20 @@ [![codecov](https://codecov.io/gh/GrepTimeTeam/greptimedb/branch/develop/graph/badge.svg?token=FITFDI3J3C)](https://codecov.io/gh/GrepTimeTeam/greptimedb) GreptimeDB: the next-generation hybrid timeseries/analytics processing database in the cloud. + +## Usage + +``` +// Start datanode with default options. +cargo run -- datanode start + +OR + +// Start datanode with `http-addr` option. +cargo run -- datanode start --http-addr=0.0.0.0:9999 + +OR + +// Start datanode with `log-dir` and `log-level` options. +cargo run -- --log-dir=logs --log-level=debug datanode start +``` \ No newline at end of file diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index e086428909..7552d838ee 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -9,6 +9,8 @@ path = "src/bin/greptime.rs" [dependencies] clap = { version = "3.1", features = ["derive"] } -common-telemetry = { path = "../common/telemetry" , features = ["deadlock_detection"]} +common-error = { path = "../common/error" } +common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"]} datanode = { path = "../datanode" } +snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.18.0", features = ["full"] } diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 87fef1c422..26436e6af3 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -1,33 +1,72 @@ +use std::fmt; + use clap::Parser; -use cmd::opts::{GrepTimeOpts, NodeType}; -use common_telemetry::{self, logging::error}; -use datanode::DataNode; +use cmd::datanode; +use cmd::error::Result; +use common_telemetry::{self, logging::error, logging::info}; -async fn datanode_main(_opts: &GrepTimeOpts) { - match DataNode::new() { - Ok(data_node) => { - if let Err(e) = data_node.start().await { - error!(e; "Fail to start data node"); - } +#[derive(Parser)] +#[clap(name = "greptimedb")] +struct Command { + #[clap(long, default_value = "/tmp/greptime/logs")] + log_dir: String, + #[clap(long, default_value = "info")] + log_level: String, + #[clap(subcommand)] + subcmd: SubCommand, +} + +impl Command { + async fn run(self) -> Result<()> { + self.subcmd.run().await + } +} + +#[derive(Parser)] +enum SubCommand { + #[clap(name = "datanode")] + Datanode(datanode::Command), +} + +impl SubCommand { + async fn run(self) -> Result<()> { + match self { + SubCommand::Datanode(cmd) => cmd.run().await, } + } +} - Err(e) => error!(e; "Fail to new data node"), +impl fmt::Display for SubCommand { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SubCommand::Datanode(..) => write!(f, "greptime-datanode"), + } } } #[tokio::main] -async fn main() { - let opts = GrepTimeOpts::parse(); - let node_type = opts.node_type; - // TODO(dennis): 1. adds ip/port to app - // 2. config log dir - let app = format!("{node_type:?}-node").to_lowercase(); +async fn main() -> Result<()> { + let cmd = Command::parse(); + // TODO(dennis): + // 1. adds ip/port to app + let app_name = &cmd.subcmd.to_string(); + let log_dir = &cmd.log_dir; + let log_level = &cmd.log_level; common_telemetry::set_panic_hook(); common_telemetry::init_default_metrics_recorder(); - let _guard = common_telemetry::init_global_logging(&app, "logs", "info", false); + let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false); - match node_type { - NodeType::Data => datanode_main(&opts).await, + tokio::select! { + result = cmd.run() => { + if let Err(err) = result { + error!(err; "Fatal error occurs!"); + } + } + _ = tokio::signal::ctrl_c() => { + info!("Goodbye!"); + } } + + Ok(()) } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs new file mode 100644 index 0000000000..3f99164cd8 --- /dev/null +++ b/src/cmd/src/datanode.rs @@ -0,0 +1,57 @@ +use clap::Parser; +use datanode::{Datanode, DatanodeOptions}; +use snafu::ResultExt; + +use crate::error::{Result, StartDatanodeSnafu}; + +#[derive(Parser)] +pub struct Command { + #[clap(subcommand)] + subcmd: SubCommand, +} + +impl Command { + pub async fn run(self) -> Result<()> { + self.subcmd.run().await + } +} + +#[derive(Parser)] +enum SubCommand { + Start(StartCommand), +} + +impl SubCommand { + async fn run(self) -> Result<()> { + match self { + SubCommand::Start(cmd) => cmd.run().await, + } + } +} + +#[derive(Debug, Parser)] +struct StartCommand { + #[clap(long, default_value = "0.0.0.0:3000")] + http_addr: String, + #[clap(long, default_value = "0.0.0.0:3001")] + rpc_addr: String, +} + +impl StartCommand { + async fn run(self) -> Result<()> { + Datanode::new(self.into()) + .context(StartDatanodeSnafu)? + .start() + .await + .context(StartDatanodeSnafu) + } +} + +impl From for DatanodeOptions { + fn from(cmd: StartCommand) -> Self { + DatanodeOptions { + http_addr: cmd.http_addr, + rpc_addr: cmd.rpc_addr, + } + } +} diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs new file mode 100644 index 0000000000..2cb414ebcf --- /dev/null +++ b/src/cmd/src/error.rs @@ -0,0 +1,31 @@ +use std::any::Any; + +use common_error::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Fail to start datanode, source: {}", source))] + StartDatanode { + #[snafu(backtrace)] + source: datanode::error::Error, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::StartDatanode { source } => source.status_code(), + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 9d1c99982e..d9276f0f3c 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -1 +1,2 @@ -pub mod opts; +pub mod datanode; +pub mod error; diff --git a/src/cmd/src/opts.rs b/src/cmd/src/opts.rs deleted file mode 100644 index 9fc3f80148..0000000000 --- a/src/cmd/src/opts.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! greptime commandline options -use clap::{ArgEnum, Parser}; - -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ArgEnum)] -pub enum NodeType { - /// Data node - Data, -} - -#[derive(Parser, Debug)] -#[clap(author, version, about, long_about = None)] -pub struct GrepTimeOpts { - #[clap(name = "type", short, long, arg_enum)] - pub node_type: NodeType, -} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index cef5f35745..ab1fef2967 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -8,19 +8,26 @@ use crate::error::{NewCatalogSnafu, Result}; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; -/// DataNode service. -pub struct DataNode { +#[derive(Debug)] +pub struct DatanodeOptions { + pub http_addr: String, + pub rpc_addr: String, +} +/// Datanode service. +pub struct Datanode { + opts: DatanodeOptions, services: Services, _catalog_list: CatalogListRef, _instance: InstanceRef, } -impl DataNode { - pub fn new() -> Result { +impl Datanode { + pub fn new(opts: DatanodeOptions) -> Result { let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?; let instance = Arc::new(Instance::new(catalog_list.clone())); Ok(Self { + opts, services: Services::new(instance.clone()), _catalog_list: catalog_list, _instance: instance, @@ -28,6 +35,6 @@ impl DataNode { } pub async fn start(&self) -> Result<()> { - self.services.start().await + self.services.start(&self.opts).await } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 537d5c75dc..6d97d0bc23 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -22,6 +22,12 @@ pub enum Error { // a backtrace is not carried in this varaint. #[snafu(display("Fail to start HTTP server, source: {}", source))] StartHttp { source: hyper::Error }, + + #[snafu(display("Fail to parse address {}, source: {}", addr, source))] + ParseAddr { + addr: String, + source: std::net::AddrParseError, + }, } pub type Result = std::result::Result; @@ -31,7 +37,7 @@ impl ErrorExt for Error { match self { Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), // TODO(yingwen): Further categorize http error. - Error::StartHttp { .. } => StatusCode::Internal, + Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal, } } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 57de3c7e56..c466613256 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -5,4 +5,5 @@ pub mod instance; mod metric; pub mod server; -pub use crate::datanode::DataNode; +pub use crate::datanode::Datanode; +pub use crate::datanode::DatanodeOptions; diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index d7a2aca89a..8ad559d59d 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -3,6 +3,7 @@ pub mod http; use http::HttpServer; +use crate::datanode::DatanodeOptions; use crate::error::Result; use crate::instance::InstanceRef; @@ -18,7 +19,7 @@ impl Services { } } - pub async fn start(&self) -> Result<()> { - self.http_server.start().await + pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> { + self.http_server.start(&opts.http_addr).await } } diff --git a/src/datanode/src/server/http.rs b/src/datanode/src/server/http.rs index b186a1431a..f662ce3535 100644 --- a/src/datanode/src/server/http.rs +++ b/src/datanode/src/server/http.rs @@ -18,7 +18,7 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; -use crate::error::{Result, StartHttpSnafu}; +use crate::error::{ParseAddrSnafu, Result, StartHttpSnafu}; use crate::server::InstanceRef; /// Http server @@ -118,17 +118,14 @@ impl HttpServer { ) } - pub async fn start(&self) -> Result<()> { + pub async fn start(&self, addr: &str) -> Result<()> { let app = self.make_app(); - - let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); - info!("Datanode HTTP server is listening on {}", addr); - let server = axum::Server::bind(&addr).serve(app.into_make_service()); + let socket_addr: SocketAddr = addr.parse().context(ParseAddrSnafu { addr })?; + info!("Datanode HTTP server is listening on {}", socket_addr); + let server = axum::Server::bind(&socket_addr).serve(app.into_make_service()); let graceful = server.with_graceful_shutdown(shutdown_signal()); - graceful.await.context(StartHttpSnafu)?; - - Ok(()) + graceful.await.context(StartHttpSnafu) } }