feat(cmd): command refactor (#44)

* feat(cmd): command refactor
This commit is contained in:
fengjiachun
2022-06-15 20:08:00 +08:00
committed by GitHub
parent 633524709b
commit 725a261b55
13 changed files with 200 additions and 54 deletions

2
Cargo.lock generated
View File

@@ -527,8 +527,10 @@ name = "cmd"
version = "0.1.0"
dependencies = [
"clap",
"common-error",
"common-telemetry",
"datanode",
"snafu",
"tokio",
]

View File

@@ -3,3 +3,20 @@
[![codecov](https://codecov.io/gh/GrepTimeTeam/greptimedb/branch/develop/graph/badge.svg?token=FITFDI3J3C)](https://codecov.io/gh/GrepTimeTeam/greptimedb)
GreptimeDB: the next-generation hybrid timeseries/analytics processing database in the cloud.
## Usage
```
// Start datanode with default options.
cargo run -- datanode start
OR
// Start datanode with `http-addr` option.
cargo run -- datanode start --http-addr=0.0.0.0:9999
OR
// Start datanode with `log-dir` and `log-level` options.
cargo run -- --log-dir=logs --log-level=debug datanode start
```

View File

@@ -9,6 +9,8 @@ path = "src/bin/greptime.rs"
[dependencies]
clap = { version = "3.1", features = ["derive"] }
common-telemetry = { path = "../common/telemetry" , features = ["deadlock_detection"]}
common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"]}
datanode = { path = "../datanode" }
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18.0", features = ["full"] }

View File

@@ -1,33 +1,72 @@
use std::fmt;
use clap::Parser;
use cmd::opts::{GrepTimeOpts, NodeType};
use common_telemetry::{self, logging::error};
use datanode::DataNode;
use cmd::datanode;
use cmd::error::Result;
use common_telemetry::{self, logging::error, logging::info};
async fn datanode_main(_opts: &GrepTimeOpts) {
match DataNode::new() {
Ok(data_node) => {
if let Err(e) = data_node.start().await {
error!(e; "Fail to start data node");
}
#[derive(Parser)]
#[clap(name = "greptimedb")]
struct Command {
#[clap(long, default_value = "/tmp/greptime/logs")]
log_dir: String,
#[clap(long, default_value = "info")]
log_level: String,
#[clap(subcommand)]
subcmd: SubCommand,
}
impl Command {
async fn run(self) -> Result<()> {
self.subcmd.run().await
}
}
#[derive(Parser)]
enum SubCommand {
#[clap(name = "datanode")]
Datanode(datanode::Command),
}
impl SubCommand {
async fn run(self) -> Result<()> {
match self {
SubCommand::Datanode(cmd) => cmd.run().await,
}
}
}
Err(e) => error!(e; "Fail to new data node"),
impl fmt::Display for SubCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubCommand::Datanode(..) => write!(f, "greptime-datanode"),
}
}
}
#[tokio::main]
async fn main() {
let opts = GrepTimeOpts::parse();
let node_type = opts.node_type;
// TODO(dennis): 1. adds ip/port to app
// 2. config log dir
let app = format!("{node_type:?}-node").to_lowercase();
async fn main() -> Result<()> {
let cmd = Command::parse();
// TODO(dennis):
// 1. adds ip/port to app
let app_name = &cmd.subcmd.to_string();
let log_dir = &cmd.log_dir;
let log_level = &cmd.log_level;
common_telemetry::set_panic_hook();
common_telemetry::init_default_metrics_recorder();
let _guard = common_telemetry::init_global_logging(&app, "logs", "info", false);
let _guard = common_telemetry::init_global_logging(app_name, log_dir, log_level, false);
match node_type {
NodeType::Data => datanode_main(&opts).await,
tokio::select! {
result = cmd.run() => {
if let Err(err) = result {
error!(err; "Fatal error occurs!");
}
}
_ = tokio::signal::ctrl_c() => {
info!("Goodbye!");
}
}
Ok(())
}

57
src/cmd/src/datanode.rs Normal file
View File

@@ -0,0 +1,57 @@
use clap::Parser;
use datanode::{Datanode, DatanodeOptions};
use snafu::ResultExt;
use crate::error::{Result, StartDatanodeSnafu};
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
subcmd: SubCommand,
}
impl Command {
pub async fn run(self) -> Result<()> {
self.subcmd.run().await
}
}
#[derive(Parser)]
enum SubCommand {
Start(StartCommand),
}
impl SubCommand {
async fn run(self) -> Result<()> {
match self {
SubCommand::Start(cmd) => cmd.run().await,
}
}
}
#[derive(Debug, Parser)]
struct StartCommand {
#[clap(long, default_value = "0.0.0.0:3000")]
http_addr: String,
#[clap(long, default_value = "0.0.0.0:3001")]
rpc_addr: String,
}
impl StartCommand {
async fn run(self) -> Result<()> {
Datanode::new(self.into())
.context(StartDatanodeSnafu)?
.start()
.await
.context(StartDatanodeSnafu)
}
}
impl From<StartCommand> for DatanodeOptions {
fn from(cmd: StartCommand) -> Self {
DatanodeOptions {
http_addr: cmd.http_addr,
rpc_addr: cmd.rpc_addr,
}
}
}

31
src/cmd/src/error.rs Normal file
View File

@@ -0,0 +1,31 @@
use std::any::Any;
use common_error::prelude::*;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Fail to start datanode, source: {}", source))]
StartDatanode {
#[snafu(backtrace)]
source: datanode::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::StartDatanode { source } => source.status_code(),
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1 +1,2 @@
pub mod opts;
pub mod datanode;
pub mod error;

View File

@@ -1,15 +0,0 @@
//! greptime commandline options
use clap::{ArgEnum, Parser};
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ArgEnum)]
pub enum NodeType {
/// Data node
Data,
}
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct GrepTimeOpts {
#[clap(name = "type", short, long, arg_enum)]
pub node_type: NodeType,
}

View File

@@ -8,19 +8,26 @@ use crate::error::{NewCatalogSnafu, Result};
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;
/// DataNode service.
pub struct DataNode {
#[derive(Debug)]
pub struct DatanodeOptions {
pub http_addr: String,
pub rpc_addr: String,
}
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
services: Services,
_catalog_list: CatalogListRef,
_instance: InstanceRef,
}
impl DataNode {
pub fn new() -> Result<DataNode> {
impl Datanode {
pub fn new(opts: DatanodeOptions) -> Result<Datanode> {
let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
let instance = Arc::new(Instance::new(catalog_list.clone()));
Ok(Self {
opts,
services: Services::new(instance.clone()),
_catalog_list: catalog_list,
_instance: instance,
@@ -28,6 +35,6 @@ impl DataNode {
}
pub async fn start(&self) -> Result<()> {
self.services.start().await
self.services.start(&self.opts).await
}
}

View File

@@ -22,6 +22,12 @@ pub enum Error {
// a backtrace is not carried in this varaint.
#[snafu(display("Fail to start HTTP server, source: {}", source))]
StartHttp { source: hyper::Error },
#[snafu(display("Fail to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
source: std::net::AddrParseError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -31,7 +37,7 @@ impl ErrorExt for Error {
match self {
Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(),
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. } => StatusCode::Internal,
Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal,
}
}

View File

@@ -5,4 +5,5 @@ pub mod instance;
mod metric;
pub mod server;
pub use crate::datanode::DataNode;
pub use crate::datanode::Datanode;
pub use crate::datanode::DatanodeOptions;

View File

@@ -3,6 +3,7 @@ pub mod http;
use http::HttpServer;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::instance::InstanceRef;
@@ -18,7 +19,7 @@ impl Services {
}
}
pub async fn start(&self) -> Result<()> {
self.http_server.start().await
pub async fn start(&self, opts: &DatanodeOptions) -> Result<()> {
self.http_server.start(&opts.http_addr).await
}
}

View File

@@ -18,7 +18,7 @@ use snafu::ResultExt;
use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
use crate::error::{Result, StartHttpSnafu};
use crate::error::{ParseAddrSnafu, Result, StartHttpSnafu};
use crate::server::InstanceRef;
/// Http server
@@ -118,17 +118,14 @@ impl HttpServer {
)
}
pub async fn start(&self) -> Result<()> {
pub async fn start(&self, addr: &str) -> Result<()> {
let app = self.make_app();
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
info!("Datanode HTTP server is listening on {}", addr);
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let socket_addr: SocketAddr = addr.parse().context(ParseAddrSnafu { addr })?;
info!("Datanode HTTP server is listening on {}", socket_addr);
let server = axum::Server::bind(&socket_addr).serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await.context(StartHttpSnafu)?;
Ok(())
graceful.await.context(StartHttpSnafu)
}
}