mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 23:10:37 +00:00
fix: respect node id and metasrv addr in config file (#542)
* fix: respect node id and metasrv addr in config file * fix: fmt * fix: unit test
This commit is contained in:
@@ -93,31 +93,20 @@ impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
opts.mysql_addr = addr;
|
||||
}
|
||||
|
||||
match (cmd.metasrv_addr, cmd.node_id) {
|
||||
(Some(meta_addr), Some(node_id)) => {
|
||||
// Running mode is only set to Distributed when
|
||||
// both metasrv addr and node id are set in
|
||||
// commandline options
|
||||
opts.meta_client_opts.metasrv_addr = meta_addr.clone();
|
||||
opts.node_id = node_id;
|
||||
opts.metasrv_addr = Some(vec![meta_addr]);
|
||||
opts.mode = Mode::Distributed;
|
||||
}
|
||||
(None, None) => {
|
||||
opts.mode = Mode::Standalone;
|
||||
}
|
||||
(None, Some(_)) => {
|
||||
return MissingConfigSnafu {
|
||||
msg: "Missing metasrv address option",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
(Some(_), None) => {
|
||||
return MissingConfigSnafu {
|
||||
msg: "Missing node id option",
|
||||
}
|
||||
.fail();
|
||||
if let Some(node_id) = cmd.node_id {
|
||||
opts.node_id = Some(node_id);
|
||||
}
|
||||
|
||||
if let Some(meta_addr) = cmd.metasrv_addr {
|
||||
opts.meta_client_opts.metasrv_addr = meta_addr;
|
||||
opts.mode = Mode::Distributed;
|
||||
}
|
||||
|
||||
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
|
||||
return MissingConfigSnafu {
|
||||
msg: "Missing node id option",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
Ok(opts)
|
||||
}
|
||||
@@ -198,13 +187,32 @@ mod tests {
|
||||
config_file: None,
|
||||
})
|
||||
.is_err());
|
||||
assert!(DatanodeOptions::try_from(StartCommand {
|
||||
|
||||
// Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
|
||||
DatanodeOptions::try_from(StartCommand {
|
||||
node_id: Some(42),
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
metasrv_addr: None,
|
||||
config_file: None,
|
||||
})
|
||||
.is_err());
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_config() {
|
||||
let dn_opts = DatanodeOptions::try_from(StartCommand {
|
||||
node_id: None,
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
metasrv_addr: None,
|
||||
config_file: Some(format!(
|
||||
"{}/../../config/datanode.example.toml",
|
||||
std::env::current_dir().unwrap().as_path().to_str().unwrap()
|
||||
)),
|
||||
})
|
||||
.unwrap();
|
||||
assert_eq!(Some(42), dn_opts.node_id);
|
||||
assert_eq!("1.1.1.1:3002", dn_opts.meta_client_opts.metasrv_addr);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ impl Default for ObjectStoreConfig {
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct DatanodeOptions {
|
||||
pub node_id: u64,
|
||||
pub node_id: Option<u64>,
|
||||
pub rpc_addr: String,
|
||||
pub rpc_runtime_size: usize,
|
||||
pub mysql_addr: String,
|
||||
@@ -48,13 +48,12 @@ pub struct DatanodeOptions {
|
||||
pub wal_dir: String,
|
||||
pub storage: ObjectStoreConfig,
|
||||
pub mode: Mode,
|
||||
pub metasrv_addr: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
impl Default for DatanodeOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
node_id: 0,
|
||||
node_id: None,
|
||||
rpc_addr: "127.0.0.1:3001".to_string(),
|
||||
rpc_runtime_size: 8,
|
||||
mysql_addr: "127.0.0.1:3306".to_string(),
|
||||
@@ -63,7 +62,6 @@ impl Default for DatanodeOptions {
|
||||
wal_dir: "/tmp/greptimedb/wal".to_string(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
mode: Mode::Standalone,
|
||||
metasrv_addr: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,6 +280,9 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Missing node id option in distributed mode"))]
|
||||
MissingNodeId { backtrace: Backtrace },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -344,6 +347,7 @@ impl ErrorExt for Error {
|
||||
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
|
||||
Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported,
|
||||
Error::BumpTableId { source, .. } => source.status_code(),
|
||||
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,9 @@ use table_engine::config::EngineConfig as TableEngineConfig;
|
||||
use table_engine::engine::MitoEngine;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use crate::error::{self, CatalogSnafu, MetaClientInitSnafu, NewCatalogSnafu, Result};
|
||||
use crate::error::{
|
||||
self, CatalogSnafu, MetaClientInitSnafu, MissingNodeIdSnafu, NewCatalogSnafu, Result,
|
||||
};
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::script::ScriptExecutor;
|
||||
use crate::server::grpc::plan::PhysicalPlanner;
|
||||
@@ -72,7 +74,11 @@ impl Instance {
|
||||
let meta_client = match opts.mode {
|
||||
Mode::Standalone => None,
|
||||
Mode::Distributed => {
|
||||
let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?;
|
||||
let meta_client = new_metasrv_client(
|
||||
opts.node_id.context(MissingNodeIdSnafu)?,
|
||||
&opts.meta_client_opts,
|
||||
)
|
||||
.await?;
|
||||
Some(Arc::new(meta_client))
|
||||
}
|
||||
};
|
||||
@@ -106,7 +112,7 @@ impl Instance {
|
||||
Mode::Distributed => {
|
||||
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
|
||||
table_engine.clone(),
|
||||
opts.node_id,
|
||||
opts.node_id.context(MissingNodeIdSnafu)?,
|
||||
Arc::new(MetaKvBackend {
|
||||
client: meta_client.as_ref().unwrap().clone(),
|
||||
}),
|
||||
@@ -123,7 +129,7 @@ impl Instance {
|
||||
let heartbeat_task = match opts.mode {
|
||||
Mode::Standalone => None,
|
||||
Mode::Distributed => Some(HeartbeatTask::new(
|
||||
opts.node_id, /*node id not set*/
|
||||
opts.node_id.context(MissingNodeIdSnafu)?,
|
||||
opts.rpc_addr.clone(),
|
||||
meta_client.as_ref().unwrap().clone(),
|
||||
)),
|
||||
|
||||
@@ -91,7 +91,7 @@ impl Instance {
|
||||
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
|
||||
let object_store = new_object_store(&opts.storage).await?;
|
||||
let log_store = create_local_file_log_store(opts).await?;
|
||||
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id).await);
|
||||
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
|
||||
let table_engine = Arc::new(DefaultEngine::new(
|
||||
TableEngineConfig::default(),
|
||||
EngineImpl::new(
|
||||
@@ -105,7 +105,7 @@ impl Instance {
|
||||
// create remote catalog manager
|
||||
let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new(
|
||||
table_engine.clone(),
|
||||
opts.node_id,
|
||||
opts.node_id.unwrap_or(42),
|
||||
Arc::new(MetaKvBackend {
|
||||
client: meta_client.clone(),
|
||||
}),
|
||||
@@ -116,8 +116,11 @@ impl Instance {
|
||||
let script_executor =
|
||||
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;
|
||||
|
||||
let heartbeat_task =
|
||||
HeartbeatTask::new(opts.node_id, opts.rpc_addr.clone(), meta_client.clone());
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
opts.node_id.unwrap_or(42),
|
||||
opts.rpc_addr.clone(),
|
||||
meta_client.clone(),
|
||||
);
|
||||
Ok(Self {
|
||||
query_engine: query_engine.clone(),
|
||||
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
|
||||
|
||||
@@ -944,7 +944,7 @@ mod test {
|
||||
let data_tmp_dir =
|
||||
TempDir::new_in("/tmp", &format!("dist_table_test-data-{}", current)).unwrap();
|
||||
let opts = DatanodeOptions {
|
||||
node_id: datanode_id,
|
||||
node_id: Some(datanode_id),
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
|
||||
Reference in New Issue
Block a user