diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 692c120b64..c77a17b2b4 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -93,31 +93,20 @@ impl TryFrom for DatanodeOptions { opts.mysql_addr = addr; } - match (cmd.metasrv_addr, cmd.node_id) { - (Some(meta_addr), Some(node_id)) => { - // Running mode is only set to Distributed when - // both metasrv addr and node id are set in - // commandline options - opts.meta_client_opts.metasrv_addr = meta_addr.clone(); - opts.node_id = node_id; - opts.metasrv_addr = Some(vec![meta_addr]); - opts.mode = Mode::Distributed; - } - (None, None) => { - opts.mode = Mode::Standalone; - } - (None, Some(_)) => { - return MissingConfigSnafu { - msg: "Missing metasrv address option", - } - .fail(); - } - (Some(_), None) => { - return MissingConfigSnafu { - msg: "Missing node id option", - } - .fail(); + if let Some(node_id) = cmd.node_id { + opts.node_id = Some(node_id); + } + + if let Some(meta_addr) = cmd.metasrv_addr { + opts.meta_client_opts.metasrv_addr = meta_addr; + opts.mode = Mode::Distributed; + } + + if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) { + return MissingConfigSnafu { + msg: "Missing node id option", } + .fail(); } Ok(opts) } @@ -198,13 +187,32 @@ mod tests { config_file: None, }) .is_err()); - assert!(DatanodeOptions::try_from(StartCommand { + + // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value + DatanodeOptions::try_from(StartCommand { node_id: Some(42), rpc_addr: None, mysql_addr: None, metasrv_addr: None, config_file: None, }) - .is_err()); + .unwrap(); + } + + #[test] + fn test_merge_config() { + let dn_opts = DatanodeOptions::try_from(StartCommand { + node_id: None, + rpc_addr: None, + mysql_addr: None, + metasrv_addr: None, + config_file: Some(format!( + "{}/../../config/datanode.example.toml", + std::env::current_dir().unwrap().as_path().to_str().unwrap() + )), + }) + .unwrap(); + assert_eq!(Some(42), dn_opts.node_id); + assert_eq!("1.1.1.1:3002", dn_opts.meta_client_opts.metasrv_addr); } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3eb9a967dc..0d0707adc3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -39,7 +39,7 @@ impl Default for ObjectStoreConfig { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DatanodeOptions { - pub node_id: u64, + pub node_id: Option, pub rpc_addr: String, pub rpc_runtime_size: usize, pub mysql_addr: String, @@ -48,13 +48,12 @@ pub struct DatanodeOptions { pub wal_dir: String, pub storage: ObjectStoreConfig, pub mode: Mode, - pub metasrv_addr: Option>, } impl Default for DatanodeOptions { fn default() -> Self { Self { - node_id: 0, + node_id: None, rpc_addr: "127.0.0.1:3001".to_string(), rpc_runtime_size: 8, mysql_addr: "127.0.0.1:3306".to_string(), @@ -63,7 +62,6 @@ impl Default for DatanodeOptions { wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), mode: Mode::Standalone, - metasrv_addr: None, } } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d837b7799d..03e5c0e64f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -280,6 +280,9 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Missing node id option in distributed mode"))] + MissingNodeId { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -344,6 +347,7 @@ impl ErrorExt for Error { Error::EmptyInsertBatch => StatusCode::InvalidArguments, Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported, Error::BumpTableId { source, .. } => source.status_code(), + Error::MissingNodeId { .. } => StatusCode::InvalidArguments, } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5095b9e91b..9cf336b446 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -37,7 +37,9 @@ use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; -use crate::error::{self, CatalogSnafu, MetaClientInitSnafu, NewCatalogSnafu, Result}; +use crate::error::{ + self, CatalogSnafu, MetaClientInitSnafu, MissingNodeIdSnafu, NewCatalogSnafu, Result, +}; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; use crate::server::grpc::plan::PhysicalPlanner; @@ -72,7 +74,11 @@ impl Instance { let meta_client = match opts.mode { Mode::Standalone => None, Mode::Distributed => { - let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?; + let meta_client = new_metasrv_client( + opts.node_id.context(MissingNodeIdSnafu)?, + &opts.meta_client_opts, + ) + .await?; Some(Arc::new(meta_client)) } }; @@ -106,7 +112,7 @@ impl Instance { Mode::Distributed => { let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( table_engine.clone(), - opts.node_id, + opts.node_id.context(MissingNodeIdSnafu)?, Arc::new(MetaKvBackend { client: meta_client.as_ref().unwrap().clone(), }), @@ -123,7 +129,7 @@ impl Instance { let heartbeat_task = match opts.mode { Mode::Standalone => None, Mode::Distributed => Some(HeartbeatTask::new( - opts.node_id, /*node id not set*/ + opts.node_id.context(MissingNodeIdSnafu)?, opts.rpc_addr.clone(), meta_client.as_ref().unwrap().clone(), )), diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index d0ed8f6227..edbfe09c87 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -91,7 +91,7 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let object_store = new_object_store(&opts.storage).await?; let log_store = create_local_file_log_store(opts).await?; - let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id).await); + let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( @@ -105,7 +105,7 @@ impl Instance { // create remote catalog manager let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new( table_engine.clone(), - opts.node_id, + opts.node_id.unwrap_or(42), Arc::new(MetaKvBackend { client: meta_client.clone(), }), @@ -116,8 +116,11 @@ impl Instance { let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; - let heartbeat_task = - HeartbeatTask::new(opts.node_id, opts.rpc_addr.clone(), meta_client.clone()); + let heartbeat_task = HeartbeatTask::new( + opts.node_id.unwrap_or(42), + opts.rpc_addr.clone(), + meta_client.clone(), + ); Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()), diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index da41bbe46c..903f44032a 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -944,7 +944,7 @@ mod test { let data_tmp_dir = TempDir::new_in("/tmp", &format!("dist_table_test-data-{}", current)).unwrap(); let opts = DatanodeOptions { - node_id: datanode_id, + node_id: Some(datanode_id), wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),