mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 18:00:41 +00:00
fix: wrong frontend registration address (#4199)
* fix: frontend registration address is wrong, #4186 * fix: license header * chore: adds hostname to frontend grpc * fix: forgot run make config-docs * chore: warn when using bind_addr * fix: flow node heartbeat carrying address
This commit is contained in:
@@ -170,6 +170,7 @@
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>Support the following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||
@@ -314,7 +315,7 @@
|
||||
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
|
||||
| `grpc.hostname` | String | `127.0.0.1` | The hostname to advertise to the metasrv. |
|
||||
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||
|
||||
@@ -43,7 +43,8 @@ rpc_max_send_message_size = "512MB"
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
addr = "127.0.0.1:3001"
|
||||
## The hostname to advertise to the metasrv.
|
||||
## The hostname advertised to the metasrv,
|
||||
## and used for connections from outside the host
|
||||
hostname = "127.0.0.1"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
|
||||
@@ -36,6 +36,9 @@ body_limit = "64MB"
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
addr = "127.0.0.1:4001"
|
||||
## The hostname advertised to the metasrv,
|
||||
## and used for connections from outside the host
|
||||
hostname = "127.0.0.1"
|
||||
## The number of server worker threads.
|
||||
runtime_size = 8
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
|
||||
use meta_client::MetaClientOptions;
|
||||
use servers::addrs;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::{mpsc, Notify};
|
||||
use tokio::time::Instant;
|
||||
@@ -47,8 +48,7 @@ pub(crate) mod task_tracker;
|
||||
pub struct HeartbeatTask {
|
||||
node_id: u64,
|
||||
node_epoch: u64,
|
||||
server_addr: String,
|
||||
server_hostname: Option<String>,
|
||||
peer_addr: String,
|
||||
running: Arc<AtomicBool>,
|
||||
meta_client: Arc<MetaClient>,
|
||||
region_server: RegionServer,
|
||||
@@ -84,8 +84,7 @@ impl HeartbeatTask {
|
||||
node_id: opts.node_id.unwrap_or(0),
|
||||
// We use datanode's start time millis as the node's epoch.
|
||||
node_epoch: common_time::util::current_time_millis() as u64,
|
||||
server_addr: opts.grpc.addr.clone(),
|
||||
server_hostname: Some(opts.grpc.hostname.clone()),
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
meta_client: Arc::new(meta_client),
|
||||
region_server,
|
||||
@@ -183,7 +182,7 @@ impl HeartbeatTask {
|
||||
let interval = self.interval;
|
||||
let node_id = self.node_id;
|
||||
let node_epoch = self.node_epoch;
|
||||
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
|
||||
let addr = &self.peer_addr;
|
||||
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
|
||||
|
||||
let meta_client = self.meta_client.clone();
|
||||
@@ -350,25 +349,6 @@ impl HeartbeatTask {
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves hostname:port address for meta registration
|
||||
///
|
||||
fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
|
||||
match hostname_addr {
|
||||
Some(hostname_addr) => {
|
||||
// it has port configured
|
||||
if hostname_addr.contains(':') {
|
||||
hostname_addr.clone()
|
||||
} else {
|
||||
// otherwise, resolve port from bind_addr
|
||||
// should be safe to unwrap here because bind_addr is already validated
|
||||
let port = bind_addr.split(':').nth(1).unwrap();
|
||||
format!("{hostname_addr}:{port}")
|
||||
}
|
||||
}
|
||||
None => bind_addr.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create metasrv client instance and spawn heartbeat loop.
|
||||
pub async fn new_metasrv_client(
|
||||
node_id: u64,
|
||||
@@ -404,24 +384,3 @@ pub async fn new_metasrv_client(
|
||||
.context(MetaClientInitSnafu)?;
|
||||
Ok(meta_client)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn test_resolve_addr() {
|
||||
assert_eq!(
|
||||
"tomcat:3001",
|
||||
super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned()))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"tomcat:3002",
|
||||
super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned()))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"127.0.0.1:3001",
|
||||
super::resolve_addr("127.0.0.1:3001", &None)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -37,7 +38,7 @@ use crate::{Error, FlownodeOptions};
|
||||
#[derive(Clone)]
|
||||
pub struct HeartbeatTask {
|
||||
node_id: u64,
|
||||
server_addr: String,
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
@@ -53,7 +54,7 @@ impl HeartbeatTask {
|
||||
) -> Self {
|
||||
Self {
|
||||
node_id: opts.node_id.unwrap_or(0),
|
||||
server_addr: opts.grpc.addr.clone(),
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
@@ -110,7 +111,7 @@ impl HeartbeatTask {
|
||||
let report_interval = self.report_interval;
|
||||
let self_peer = Some(Peer {
|
||||
id: self.node_id,
|
||||
addr: self.server_addr.clone(),
|
||||
addr: self.peer_addr.clone(),
|
||||
});
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
|
||||
@@ -22,6 +22,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -37,7 +38,7 @@ pub mod handler;
|
||||
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
|
||||
#[derive(Clone)]
|
||||
pub struct HeartbeatTask {
|
||||
server_addr: String,
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
@@ -53,7 +54,7 @@ impl HeartbeatTask {
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Self {
|
||||
HeartbeatTask {
|
||||
server_addr: opts.grpc.addr.clone(),
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
@@ -129,7 +130,7 @@ impl HeartbeatTask {
|
||||
let self_peer = Some(Peer {
|
||||
// The peer id doesn't make sense for frontend, so we just set it 0.
|
||||
id: 0,
|
||||
addr: self.server_addr.clone(),
|
||||
addr: self.peer_addr.clone(),
|
||||
});
|
||||
|
||||
common_runtime::spawn_hb(async move {
|
||||
|
||||
58
src/servers/src/addrs.rs
Normal file
58
src/servers/src/addrs.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_telemetry::warn;
|
||||
|
||||
/// Resolves hostname:port address for meta registration.
|
||||
/// If `hostname_addr` is present, prefer to use it, `bind_addr` otherwise.
|
||||
pub fn resolve_addr(bind_addr: &str, hostname_addr: Option<&str>) -> String {
|
||||
match hostname_addr {
|
||||
Some(hostname_addr) => {
|
||||
// it has port configured
|
||||
if hostname_addr.contains(':') {
|
||||
hostname_addr.to_string()
|
||||
} else {
|
||||
// otherwise, resolve port from bind_addr
|
||||
// should be safe to unwrap here because bind_addr is already validated
|
||||
let port = bind_addr.split(':').nth(1).unwrap();
|
||||
format!("{hostname_addr}:{port}")
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("hostname not set, using bind_addr: {bind_addr} instead.");
|
||||
bind_addr.to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn test_resolve_addr() {
|
||||
assert_eq!(
|
||||
"tomcat:3001",
|
||||
super::resolve_addr("127.0.0.1:3001", Some("tomcat"))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"tomcat:3002",
|
||||
super::resolve_addr("127.0.0.1:3001", Some("tomcat:3002"))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"127.0.0.1:3001",
|
||||
super::resolve_addr("127.0.0.1:3001", None)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@
|
||||
use datatypes::schema::Schema;
|
||||
use query::plan::LogicalPlan;
|
||||
|
||||
pub mod addrs;
|
||||
pub mod configurator;
|
||||
pub mod error;
|
||||
pub mod export_metrics;
|
||||
|
||||
Reference in New Issue
Block a user