feat: Support automatic DNS lookup for kafka bootstrap servers

This commit is contained in:
JohnsonLee
2024-02-25 22:41:49 +08:00
parent 1f1d1b4f57
commit 5baed7b01d
8 changed files with 132 additions and 2 deletions

15
Cargo.lock generated
View File

@@ -2106,6 +2106,7 @@ version = "0.6.0"
dependencies = [
"common-base",
"common-telemetry",
"dns-lookup 2.0.4",
"futures-util",
"humantime-serde",
"rskafka",
@@ -3073,6 +3074,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "dns-lookup"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
dependencies = [
"cfg-if 1.0.0",
"libc",
"socket2 0.5.5",
"windows-sys 0.48.0",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
@@ -8370,7 +8383,7 @@ dependencies = [
"crossbeam-utils",
"csv-core",
"digest",
"dns-lookup",
"dns-lookup 1.0.8",
"dyn-clone",
"flate2",
"gethostname",

View File

@@ -99,6 +99,7 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
derive_builder = "0.12"
dns-lookup = "2.0"
etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
common-base.workspace = true
common-telemetry.workspace = true
dns-lookup.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true

View File

@@ -147,6 +147,85 @@ mod tests {
assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected));
}
// TODO: find an way to merge with the test below.
#[test]
fn test_toml_kafka_with_dnslookup() {
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["localhost:9092"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
max_batch_size = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
// Deserialized to MetaSrvWalConfig.
let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetaSrvKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected));
// Deserialized to DatanodeWalConfig.
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: Compression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
// Deserialized to StandaloneWalConfig.
let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap();
let expected = StandaloneKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::default(),
max_batch_size: ReadableSize::mb(1),
linger: Duration::from_millis(200),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected));
}
#[test]
fn test_toml_kafka() {
let toml_str = r#"

View File

@@ -14,7 +14,7 @@
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde::{de, Deserialize, Serialize};
use serde_with::with_prefix;
with_prefix!(pub backoff_prefix "backoff_");
@@ -46,3 +46,36 @@ impl Default for BackoffConfig {
}
}
}
fn lookup_endpoint<'de, D>(endpoint: &str) -> Result<String, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut iter = endpoint.split(':');
let ip_or_domain: &str = iter.next().unwrap();
let port: &str = iter.next().unwrap();
if ip_or_domain.parse::<std::net::IpAddr>().is_ok() {
return Ok(endpoint.to_string());
}
let ips: Vec<_> = dns_lookup::lookup_host(ip_or_domain)
.map_err(de::Error::custom)?
.into_iter()
.filter(|addr| addr.is_ipv4())
.collect();
if ips.is_empty() {
return Err(de::Error::custom(format!(
"failed to resolve the domain name: {}",
ip_or_domain
)));
}
Ok(format!("{}:{}", ips[0], port))
}
pub fn deserialize_broker_endpoints<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: serde::Deserializer<'de>,
{
let mut broker_endpoints: Vec<String> = Vec::deserialize(deserializer)?;
for endpoint in &mut broker_endpoints {
*endpoint = lookup_endpoint::<D>(endpoint)?;
}
Ok(broker_endpoints)
}

View File

@@ -26,6 +26,7 @@ use crate::BROKER_ENDPOINT;
#[serde(default)]
pub struct DatanodeKafkaConfig {
/// The broker endpoints of the Kafka cluster.
#[serde(deserialize_with = "super::common::deserialize_broker_endpoints")]
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]

View File

@@ -24,6 +24,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
#[serde(default)]
pub struct MetaSrvKafkaConfig {
/// The broker endpoints of the Kafka cluster.
#[serde(deserialize_with = "super::common::deserialize_broker_endpoints")]
pub broker_endpoints: Vec<String>,
/// The number of topics to be created upon start.
pub num_topics: usize,

View File

@@ -26,6 +26,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
#[serde(default)]
pub struct StandaloneKafkaConfig {
/// The broker endpoints of the Kafka cluster.
#[serde(deserialize_with = "super::common::deserialize_broker_endpoints")]
pub broker_endpoints: Vec<String>,
/// Number of topics to be created upon start.
pub num_topics: usize,