From a779cb36ecc0a472a21e2abb7f20ac0b59171242 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 25 Jun 2024 23:13:07 -0700 Subject: [PATCH] fix: wrong frontend registration address (#4199) * fix: frontend registration address is wrong, #4186 * fix: license header * chore: adds hostname to frontend grpc * fix: forgot run make config-docs * chore: warn when using bind_addr * fix: flow node heartbeat carrying address --- config/config.md | 3 +- config/datanode.example.toml | 3 +- config/frontend.example.toml | 3 ++ src/datanode/src/heartbeat.rs | 49 +++-------------------------- src/flow/src/heartbeat.rs | 7 +++-- src/frontend/src/heartbeat.rs | 7 +++-- src/servers/src/addrs.rs | 58 +++++++++++++++++++++++++++++++++++ src/servers/src/lib.rs | 1 + 8 files changed, 78 insertions(+), 53 deletions(-) create mode 100644 src/servers/src/addrs.rs diff --git a/config/config.md b/config/config.md index daa0632e1b..2955e38024 100644 --- a/config/config.md +++ b/config/config.md @@ -170,6 +170,7 @@ | `http.body_limit` | String | `64MB` | HTTP request body limit.
Support the following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | +| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,
and used for connections from outside the host | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. | | `grpc.tls.mode` | String | `disable` | TLS mode. | @@ -314,7 +315,7 @@ | `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. | -| `grpc.hostname` | String | `127.0.0.1` | The hostname to advertise to the metasrv. | +| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,
and used for connections from outside the host | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 5f64573d78..b3be8b5836 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -43,7 +43,8 @@ rpc_max_send_message_size = "512MB" [grpc] ## The address to bind the gRPC server. addr = "127.0.0.1:3001" -## The hostname to advertise to the metasrv. +## The hostname advertised to the metasrv, +## and used for connections from outside the host hostname = "127.0.0.1" ## The number of server worker threads. runtime_size = 8 diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 7ac52a672f..c9c1c28cf1 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -36,6 +36,9 @@ body_limit = "64MB" [grpc] ## The address to bind the gRPC server. addr = "127.0.0.1:4001" +## The hostname advertised to the metasrv, +## and used for connections from outside the host +hostname = "127.0.0.1" ## The number of server worker threads. runtime_size = 8 diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index b3d4baea76..ff872b7959 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -28,6 +28,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; +use servers::addrs; use snafu::ResultExt; use tokio::sync::{mpsc, Notify}; use tokio::time::Instant; @@ -47,8 +48,7 @@ pub(crate) mod task_tracker; pub struct HeartbeatTask { node_id: u64, node_epoch: u64, - server_addr: String, - server_hostname: Option, + peer_addr: String, running: Arc, meta_client: Arc, region_server: RegionServer, @@ -84,8 +84,7 @@ impl HeartbeatTask { node_id: opts.node_id.unwrap_or(0), // We use datanode's start time millis as the node's epoch. node_epoch: common_time::util::current_time_millis() as u64, - server_addr: opts.grpc.addr.clone(), - server_hostname: Some(opts.grpc.hostname.clone()), + peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)), running: Arc::new(AtomicBool::new(false)), meta_client: Arc::new(meta_client), region_server, @@ -183,7 +182,7 @@ impl HeartbeatTask { let interval = self.interval; let node_id = self.node_id; let node_epoch = self.node_epoch; - let addr = resolve_addr(&self.server_addr, &self.server_hostname); + let addr = &self.peer_addr; info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); let meta_client = self.meta_client.clone(); @@ -350,25 +349,6 @@ impl HeartbeatTask { } } -/// Resolves hostname:port address for meta registration -/// -fn resolve_addr(bind_addr: &str, hostname_addr: &Option) -> String { - match hostname_addr { - Some(hostname_addr) => { - // it has port configured - if hostname_addr.contains(':') { - hostname_addr.clone() - } else { - // otherwise, resolve port from bind_addr - // should be safe to unwrap here because bind_addr is already validated - let port = bind_addr.split(':').nth(1).unwrap(); - format!("{hostname_addr}:{port}") - } - } - None => bind_addr.to_owned(), - } -} - /// Create metasrv client instance and spawn heartbeat loop. pub async fn new_metasrv_client( node_id: u64, @@ -404,24 +384,3 @@ pub async fn new_metasrv_client( .context(MetaClientInitSnafu)?; Ok(meta_client) } - -#[cfg(test)] -mod tests { - #[test] - fn test_resolve_addr() { - assert_eq!( - "tomcat:3001", - super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned())) - ); - - assert_eq!( - "tomcat:3002", - super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned())) - ); - - assert_eq!( - "127.0.0.1:3001", - super::resolve_addr("127.0.0.1:3001", &None) - ); - } -} diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 4777fe9849..e46769aeb6 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -25,6 +25,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; +use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; @@ -37,7 +38,7 @@ use crate::{Error, FlownodeOptions}; #[derive(Clone)] pub struct HeartbeatTask { node_id: u64, - server_addr: String, + peer_addr: String, meta_client: Arc, report_interval: Duration, retry_interval: Duration, @@ -53,7 +54,7 @@ impl HeartbeatTask { ) -> Self { Self { node_id: opts.node_id.unwrap_or(0), - server_addr: opts.grpc.addr.clone(), + peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)), meta_client, report_interval: heartbeat_opts.interval, retry_interval: heartbeat_opts.retry_interval, @@ -110,7 +111,7 @@ impl HeartbeatTask { let report_interval = self.report_interval; let self_peer = Some(Peer { id: self.node_id, - addr: self.server_addr.clone(), + addr: self.peer_addr.clone(), }); common_runtime::spawn_hb(async move { diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 0ccbed35a6..48c09bacd5 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -22,6 +22,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; +use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; @@ -37,7 +38,7 @@ pub mod handler; /// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. #[derive(Clone)] pub struct HeartbeatTask { - server_addr: String, + peer_addr: String, meta_client: Arc, report_interval: u64, retry_interval: u64, @@ -53,7 +54,7 @@ impl HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Self { HeartbeatTask { - server_addr: opts.grpc.addr.clone(), + peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)), meta_client, report_interval: heartbeat_opts.interval.as_millis() as u64, retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, @@ -129,7 +130,7 @@ impl HeartbeatTask { let self_peer = Some(Peer { // The peer id doesn't make sense for frontend, so we just set it 0. id: 0, - addr: self.server_addr.clone(), + addr: self.peer_addr.clone(), }); common_runtime::spawn_hb(async move { diff --git a/src/servers/src/addrs.rs b/src/servers/src/addrs.rs new file mode 100644 index 0000000000..c43ba4c32f --- /dev/null +++ b/src/servers/src/addrs.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::warn; + +/// Resolves hostname:port address for meta registration. +/// If `hostname_addr` is present, prefer to use it, `bind_addr` otherwise. +pub fn resolve_addr(bind_addr: &str, hostname_addr: Option<&str>) -> String { + match hostname_addr { + Some(hostname_addr) => { + // it has port configured + if hostname_addr.contains(':') { + hostname_addr.to_string() + } else { + // otherwise, resolve port from bind_addr + // should be safe to unwrap here because bind_addr is already validated + let port = bind_addr.split(':').nth(1).unwrap(); + format!("{hostname_addr}:{port}") + } + } + None => { + warn!("hostname not set, using bind_addr: {bind_addr} instead."); + bind_addr.to_string() + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_resolve_addr() { + assert_eq!( + "tomcat:3001", + super::resolve_addr("127.0.0.1:3001", Some("tomcat")) + ); + + assert_eq!( + "tomcat:3002", + super::resolve_addr("127.0.0.1:3001", Some("tomcat:3002")) + ); + + assert_eq!( + "127.0.0.1:3001", + super::resolve_addr("127.0.0.1:3001", None) + ); + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index da7490a16d..85ebea4ca8 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -20,6 +20,7 @@ use datatypes::schema::Schema; use query::plan::LogicalPlan; +pub mod addrs; pub mod configurator; pub mod error; pub mod export_metrics;