mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-02 13:20:40 +00:00
* feat: adds datanode config file supporting, close #156 * doc: update readme * fix: address CR problems * fix: remove unused log
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -706,8 +706,11 @@ dependencies = [
|
||||
"common-error",
|
||||
"common-telemetry",
|
||||
"datanode",
|
||||
"serde",
|
||||
"snafu",
|
||||
"tempdir",
|
||||
"tokio",
|
||||
"toml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4435,6 +4438,15 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.5.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.7.2"
|
||||
|
||||
@@ -53,3 +53,9 @@ OR
|
||||
// Start datanode with `log-dir` and `log-level` options.
|
||||
cargo run -- --log-dir=logs --log-level=debug datanode start
|
||||
```
|
||||
|
||||
Start datanode with config file:
|
||||
|
||||
```
|
||||
cargo run -- --log-dir=logs --log-level=debug datanode start -c ./config/datanode.example.toml
|
||||
```
|
||||
|
||||
7
config/datanode.example.toml
Normal file
7
config/datanode.example.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
http_addr = '0.0.0.0:3000'
|
||||
rpc_addr = '0.0.0.0:3001'
|
||||
wal_dir = '/tmp/wal'
|
||||
|
||||
[storage]
|
||||
type = 'File'
|
||||
data_dir = '/tmp/greptimedb/data/'
|
||||
@@ -14,3 +14,8 @@ common-telemetry = { path = "../common/telemetry", features = ["deadlock_detecti
|
||||
datanode = { path = "../datanode" }
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
toml = "0.5"
|
||||
|
||||
[dev-dependencies]
|
||||
serde = "1.0"
|
||||
tempdir = "0.3"
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use clap::Parser;
|
||||
use common_telemetry::logging;
|
||||
use datanode::datanode::{Datanode, DatanodeOptions};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{Result, StartDatanodeSnafu};
|
||||
use crate::error::{Error, Result, StartDatanodeSnafu};
|
||||
use crate::toml_loader;
|
||||
|
||||
#[derive(Parser)]
|
||||
pub struct Command {
|
||||
@@ -31,15 +33,23 @@ impl SubCommand {
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct StartCommand {
|
||||
#[clap(long, default_value = "0.0.0.0:3000")]
|
||||
http_addr: String,
|
||||
#[clap(long, default_value = "0.0.0.0:3001")]
|
||||
rpc_addr: String,
|
||||
#[clap(long)]
|
||||
http_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
rpc_addr: Option<String>,
|
||||
#[clap(short, long)]
|
||||
config_file: Option<String>,
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
async fn run(self) -> Result<()> {
|
||||
Datanode::new(self.into())
|
||||
logging::info!("Datanode start command: {:#?}", self);
|
||||
|
||||
let opts: DatanodeOptions = self.try_into()?;
|
||||
|
||||
logging::info!("Datanode options: {:#?}", opts);
|
||||
|
||||
Datanode::new(opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?
|
||||
.start()
|
||||
@@ -48,12 +58,23 @@ impl StartCommand {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StartCommand> for DatanodeOptions {
|
||||
fn from(cmd: StartCommand) -> Self {
|
||||
DatanodeOptions {
|
||||
http_addr: cmd.http_addr,
|
||||
rpc_addr: cmd.rpc_addr,
|
||||
..Default::default()
|
||||
impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
type Error = Error;
|
||||
fn try_from(cmd: StartCommand) -> Result<Self> {
|
||||
let mut opts: DatanodeOptions = if let Some(path) = cmd.config_file {
|
||||
toml_loader::from_file!(&path)?
|
||||
} else {
|
||||
DatanodeOptions::default()
|
||||
};
|
||||
|
||||
if let Some(addr) = cmd.http_addr {
|
||||
opts.http_addr = addr;
|
||||
}
|
||||
|
||||
if let Some(addr) = cmd.rpc_addr {
|
||||
opts.rpc_addr = addr;
|
||||
}
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,20 @@ use common_error::prelude::*;
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Fail to start datanode, source: {}", source))]
|
||||
#[snafu(display("Failed to start datanode, source: {}", source))]
|
||||
StartDatanode {
|
||||
#[snafu(backtrace)]
|
||||
source: datanode::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read config file: {}, source: {}", path, source))]
|
||||
ReadConfig {
|
||||
source: std::io::Error,
|
||||
path: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse config, source: {}", source))]
|
||||
ParseConfig { source: toml::de::Error },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -18,6 +27,7 @@ impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::StartDatanode { source } => source.status_code(),
|
||||
Error::ReadConfig { .. } | Error::ParseConfig { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,3 +39,23 @@ impl ErrorExt for Error {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn raise_read_config_error() -> std::result::Result<(), std::io::Error> {
|
||||
Err(std::io::ErrorKind::NotFound.into())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_error() {
|
||||
let e = raise_read_config_error()
|
||||
.context(ReadConfigSnafu { path: "test" })
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert!(e.backtrace_opt().is_none());
|
||||
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod datanode;
|
||||
pub mod error;
|
||||
mod toml_loader;
|
||||
|
||||
54
src/cmd/src/toml_loader.rs
Normal file
54
src/cmd/src/toml_loader.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
macro_rules! from_file {
|
||||
($path: expr) => {
|
||||
toml::from_str(
|
||||
&std::fs::read_to_string($path)
|
||||
.context(crate::error::ReadConfigSnafu { path: $path })?,
|
||||
)
|
||||
.context(crate::error::ParseConfigSnafu)
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use from_file;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Result;
|
||||
|
||||
#[derive(Clone, PartialEq, Debug, Deserialize, Serialize)]
|
||||
struct MockConfig {
|
||||
path: String,
|
||||
port: u32,
|
||||
host: String,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_file() -> Result<()> {
|
||||
let config = MockConfig {
|
||||
path: "/tmp".to_string(),
|
||||
port: 999,
|
||||
host: "greptime.test".to_string(),
|
||||
};
|
||||
|
||||
let dir = TempDir::new("test_from_file").unwrap();
|
||||
let test_file = format!("{}/test.toml", dir.path().to_str().unwrap());
|
||||
|
||||
let s = toml::to_string(&config).unwrap();
|
||||
assert!(s.contains("host") && s.contains("path") && s.contains("port"));
|
||||
|
||||
let mut file = File::create(&test_file).unwrap();
|
||||
file.write_all(s.as_bytes()).unwrap();
|
||||
|
||||
let loaded_config: MockConfig = from_file!(&test_file)?;
|
||||
assert_eq!(loaded_config, config);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,49 +1,40 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::instance::{Instance, InstanceRef};
|
||||
use crate::server::Services;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FileStoreConfig {
|
||||
/// Storage path
|
||||
pub store_dir: String,
|
||||
}
|
||||
|
||||
impl Default for FileStoreConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
store_dir: "/tmp/greptimedb/data/".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum ObjectStoreConfig {
|
||||
File(FileStoreConfig),
|
||||
File { data_dir: String },
|
||||
}
|
||||
|
||||
impl Default for ObjectStoreConfig {
|
||||
fn default() -> Self {
|
||||
ObjectStoreConfig::File(FileStoreConfig::default())
|
||||
ObjectStoreConfig::File {
|
||||
data_dir: "/tmp/greptimedb/data/".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct DatanodeOptions {
|
||||
pub http_addr: String,
|
||||
pub rpc_addr: String,
|
||||
pub wal_dir: String,
|
||||
pub store_config: ObjectStoreConfig,
|
||||
pub storage: ObjectStoreConfig,
|
||||
}
|
||||
|
||||
impl Default for DatanodeOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
http_addr: Default::default(),
|
||||
rpc_addr: Default::default(),
|
||||
http_addr: "0.0.0.0:3000".to_string(),
|
||||
rpc_addr: "0.0.0.0:3001".to_string(),
|
||||
wal_dir: "/tmp/wal".to_string(),
|
||||
store_config: ObjectStoreConfig::default(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ pub type InstanceRef = Arc<Instance>;
|
||||
|
||||
impl Instance {
|
||||
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
|
||||
let object_store = new_object_store(&opts.store_config).await?;
|
||||
let object_store = new_object_store(&opts.storage).await?;
|
||||
let log_store = create_local_file_log_store(opts).await?;
|
||||
|
||||
let table_engine = DefaultEngine::new(
|
||||
@@ -177,20 +177,20 @@ impl Instance {
|
||||
|
||||
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
|
||||
// TODO(dennis): supports other backend
|
||||
let store_dir = util::normalize_dir(match store_config {
|
||||
ObjectStoreConfig::File(file) => &file.store_dir,
|
||||
let data_dir = util::normalize_dir(match store_config {
|
||||
ObjectStoreConfig::File { data_dir } => data_dir,
|
||||
});
|
||||
|
||||
fs::create_dir_all(path::Path::new(&store_dir))
|
||||
.context(error::CreateDirSnafu { dir: &store_dir })?;
|
||||
fs::create_dir_all(path::Path::new(&data_dir))
|
||||
.context(error::CreateDirSnafu { dir: &data_dir })?;
|
||||
|
||||
info!("The storage directory is: {}", &store_dir);
|
||||
info!("The storage directory is: {}", &data_dir);
|
||||
|
||||
let accessor = Backend::build()
|
||||
.root(&store_dir)
|
||||
.root(&data_dir)
|
||||
.finish()
|
||||
.await
|
||||
.context(error::InitBackendSnafu { dir: &store_dir })?;
|
||||
.context(error::InitBackendSnafu { dir: &data_dir })?;
|
||||
|
||||
Ok(ObjectStore::new(accessor))
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, FileStoreConfig, ObjectStoreConfig};
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
|
||||
/// Create a tmp dir(will be deleted once it goes out of scope.) and a default `DatanodeOptions`,
|
||||
/// Only for test.
|
||||
@@ -16,9 +16,9 @@ pub fn create_tmp_dir_and_datanode_opts() -> (DatanodeOptions, TestGuard) {
|
||||
let data_tmp_dir = TempDir::new("/tmp/greptimedb_test_data").unwrap();
|
||||
let opts = DatanodeOptions {
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
store_config: ObjectStoreConfig::File(FileStoreConfig {
|
||||
store_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
}),
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user