mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: add --rpc-hostname option to datanode for a persist address to store in meta (#871)
* feat: add --rpc-hostname option * fix: config file and hostname parsing * Apply suggestions from code review Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
node_id = 42
|
||||
mode = 'distributed'
|
||||
rpc_addr = '127.0.0.1:3001'
|
||||
rpc_hostname = '127.0.0.1'
|
||||
rpc_runtime_size = 8
|
||||
mysql_addr = '127.0.0.1:4406'
|
||||
mysql_runtime_size = 4
|
||||
|
||||
@@ -54,6 +54,8 @@ struct StartCommand {
|
||||
#[clap(long)]
|
||||
rpc_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
rpc_hostname: Option<String>,
|
||||
#[clap(long)]
|
||||
mysql_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
metasrv_addr: Option<String>,
|
||||
@@ -94,6 +96,11 @@ impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
if let Some(addr) = cmd.rpc_addr {
|
||||
opts.rpc_addr = addr;
|
||||
}
|
||||
|
||||
if cmd.rpc_hostname.is_some() {
|
||||
opts.rpc_hostname = cmd.rpc_hostname;
|
||||
}
|
||||
|
||||
if let Some(addr) = cmd.mysql_addr {
|
||||
opts.mysql_addr = addr;
|
||||
}
|
||||
|
||||
@@ -84,6 +84,7 @@ impl Default for WalConfig {
|
||||
pub struct DatanodeOptions {
|
||||
pub node_id: Option<u64>,
|
||||
pub rpc_addr: String,
|
||||
pub rpc_hostname: Option<String>,
|
||||
pub rpc_runtime_size: usize,
|
||||
pub mysql_addr: String,
|
||||
pub mysql_runtime_size: usize,
|
||||
@@ -99,6 +100,7 @@ impl Default for DatanodeOptions {
|
||||
Self {
|
||||
node_id: None,
|
||||
rpc_addr: "127.0.0.1:3001".to_string(),
|
||||
rpc_hostname: None,
|
||||
rpc_runtime_size: 8,
|
||||
mysql_addr: "127.0.0.1:4406".to_string(),
|
||||
mysql_runtime_size: 2,
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::error::{MetaClientInitSnafu, Result};
|
||||
pub struct HeartbeatTask {
|
||||
node_id: u64,
|
||||
server_addr: String,
|
||||
server_hostname: Option<String>,
|
||||
running: Arc<AtomicBool>,
|
||||
meta_client: Arc<MetaClient>,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
@@ -44,12 +45,14 @@ impl HeartbeatTask {
|
||||
pub fn new(
|
||||
node_id: u64,
|
||||
server_addr: String,
|
||||
server_hostname: Option<String>,
|
||||
meta_client: Arc<MetaClient>,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
node_id,
|
||||
server_addr,
|
||||
server_hostname,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
meta_client,
|
||||
catalog_manager,
|
||||
@@ -96,7 +99,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
let interval = self.interval;
|
||||
let node_id = self.node_id;
|
||||
let server_addr = self.server_addr.clone();
|
||||
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
|
||||
let meta_client = self.meta_client.clone();
|
||||
|
||||
let catalog_manager_clone = self.catalog_manager.clone();
|
||||
@@ -114,7 +117,7 @@ impl HeartbeatTask {
|
||||
let req = HeartbeatRequest {
|
||||
peer: Some(Peer {
|
||||
id: node_id,
|
||||
addr: server_addr.clone(),
|
||||
addr: addr.clone(),
|
||||
}),
|
||||
node_stat: Some(NodeStat {
|
||||
region_num,
|
||||
@@ -142,3 +145,43 @@ impl HeartbeatTask {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves hostname:port address for meta registration
|
||||
///
|
||||
fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
|
||||
match hostname_addr {
|
||||
Some(hostname_addr) => {
|
||||
// it has port configured
|
||||
if hostname_addr.contains(':') {
|
||||
hostname_addr.clone()
|
||||
} else {
|
||||
// otherwise, resolve port from bind_addr
|
||||
// should be safe to unwrap here because bind_addr is already validated
|
||||
let port = bind_addr.split(':').nth(1).unwrap();
|
||||
format!("{hostname_addr}:{port}")
|
||||
}
|
||||
}
|
||||
None => bind_addr.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn test_resolve_addr() {
|
||||
assert_eq!(
|
||||
"tomcat:3001",
|
||||
super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned()))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"tomcat:3002",
|
||||
super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned()))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"127.0.0.1:3001",
|
||||
super::resolve_addr("127.0.0.1:3001", &None)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,6 +160,7 @@ impl Instance {
|
||||
Mode::Distributed => Some(HeartbeatTask::new(
|
||||
opts.node_id.context(MissingNodeIdSnafu)?,
|
||||
opts.rpc_addr.clone(),
|
||||
opts.rpc_hostname.clone(),
|
||||
meta_client.as_ref().unwrap().clone(),
|
||||
catalog_manager.clone(),
|
||||
)),
|
||||
|
||||
@@ -86,6 +86,7 @@ impl Instance {
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
opts.node_id.unwrap_or(42),
|
||||
opts.rpc_addr.clone(),
|
||||
None,
|
||||
meta_client.clone(),
|
||||
catalog_manager.clone(),
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user