From ecb71f81bec61e33a235dead4bd9082b09ac3526 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Tue, 17 Jan 2023 10:50:50 +0800 Subject: [PATCH] feat: add --rpc-hostname option to datanode for a persist address to store in meta (#871) * feat: add --rpc-hostname option * fix: config file and hostname parsing * Apply suggestions from code review Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> --- config/datanode.example.toml | 1 + src/cmd/src/datanode.rs | 7 ++++++ src/datanode/src/datanode.rs | 2 ++ src/datanode/src/heartbeat.rs | 47 +++++++++++++++++++++++++++++++++-- src/datanode/src/instance.rs | 1 + src/datanode/src/mock.rs | 1 + 6 files changed, 57 insertions(+), 2 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 6b5c0de8f2..5fc00df1b8 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,6 +1,7 @@ node_id = 42 mode = 'distributed' rpc_addr = '127.0.0.1:3001' +rpc_hostname = '127.0.0.1' rpc_runtime_size = 8 mysql_addr = '127.0.0.1:4406' mysql_runtime_size = 4 diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 299f674437..b431d30913 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -54,6 +54,8 @@ struct StartCommand { #[clap(long)] rpc_addr: Option, #[clap(long)] + rpc_hostname: Option, + #[clap(long)] mysql_addr: Option, #[clap(long)] metasrv_addr: Option, @@ -94,6 +96,11 @@ impl TryFrom for DatanodeOptions { if let Some(addr) = cmd.rpc_addr { opts.rpc_addr = addr; } + + if cmd.rpc_hostname.is_some() { + opts.rpc_hostname = cmd.rpc_hostname; + } + if let Some(addr) = cmd.mysql_addr { opts.mysql_addr = addr; } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f3827da530..e2d385fde8 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -84,6 +84,7 @@ impl Default for WalConfig { pub struct DatanodeOptions { pub node_id: Option, pub rpc_addr: String, + pub rpc_hostname: Option, pub rpc_runtime_size: usize, pub mysql_addr: String, pub mysql_runtime_size: usize, @@ -99,6 +100,7 @@ impl Default for DatanodeOptions { Self { node_id: None, rpc_addr: "127.0.0.1:3001".to_string(), + rpc_hostname: None, rpc_runtime_size: 8, mysql_addr: "127.0.0.1:4406".to_string(), mysql_runtime_size: 2, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index c791b8e69e..904471f5b6 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -27,6 +27,7 @@ use crate::error::{MetaClientInitSnafu, Result}; pub struct HeartbeatTask { node_id: u64, server_addr: String, + server_hostname: Option, running: Arc, meta_client: Arc, catalog_manager: CatalogManagerRef, @@ -44,12 +45,14 @@ impl HeartbeatTask { pub fn new( node_id: u64, server_addr: String, + server_hostname: Option, meta_client: Arc, catalog_manager: CatalogManagerRef, ) -> Self { Self { node_id, server_addr, + server_hostname, running: Arc::new(AtomicBool::new(false)), meta_client, catalog_manager, @@ -96,7 +99,7 @@ impl HeartbeatTask { } let interval = self.interval; let node_id = self.node_id; - let server_addr = self.server_addr.clone(); + let addr = resolve_addr(&self.server_addr, &self.server_hostname); let meta_client = self.meta_client.clone(); let catalog_manager_clone = self.catalog_manager.clone(); @@ -114,7 +117,7 @@ impl HeartbeatTask { let req = HeartbeatRequest { peer: Some(Peer { id: node_id, - addr: server_addr.clone(), + addr: addr.clone(), }), node_stat: Some(NodeStat { region_num, @@ -142,3 +145,43 @@ impl HeartbeatTask { Ok(()) } } + +/// Resolves hostname:port address for meta registration +/// +fn resolve_addr(bind_addr: &str, hostname_addr: &Option) -> String { + match hostname_addr { + Some(hostname_addr) => { + // it has port configured + if hostname_addr.contains(':') { + hostname_addr.clone() + } else { + // otherwise, resolve port from bind_addr + // should be safe to unwrap here because bind_addr is already validated + let port = bind_addr.split(':').nth(1).unwrap(); + format!("{hostname_addr}:{port}") + } + } + None => bind_addr.to_owned(), + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_resolve_addr() { + assert_eq!( + "tomcat:3001", + super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned())) + ); + + assert_eq!( + "tomcat:3002", + super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned())) + ); + + assert_eq!( + "127.0.0.1:3001", + super::resolve_addr("127.0.0.1:3001", &None) + ); + } +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index b2f54ef5ea..2c89d74024 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -160,6 +160,7 @@ impl Instance { Mode::Distributed => Some(HeartbeatTask::new( opts.node_id.context(MissingNodeIdSnafu)?, opts.rpc_addr.clone(), + opts.rpc_hostname.clone(), meta_client.as_ref().unwrap().clone(), catalog_manager.clone(), )), diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index ec4304e53a..3d0e4bd6ea 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -86,6 +86,7 @@ impl Instance { let heartbeat_task = HeartbeatTask::new( opts.node_id.unwrap_or(42), opts.rpc_addr.clone(), + None, meta_client.clone(), catalog_manager.clone(), );