From 5baed7b01d0202ffa1b860cdc425de93e18eff03 Mon Sep 17 00:00:00 2001 From: JohnsonLee <0xjohnsonlee@gmail.com> Date: Sun, 25 Feb 2024 22:41:49 +0800 Subject: [PATCH] feat: Support automatic DNS lookup for kafka bootstrap servers --- Cargo.lock | 15 +++- Cargo.toml | 1 + src/common/wal/Cargo.toml | 1 + src/common/wal/src/config.rs | 79 +++++++++++++++++++ src/common/wal/src/config/kafka/common.rs | 35 +++++++- src/common/wal/src/config/kafka/datanode.rs | 1 + src/common/wal/src/config/kafka/metasrv.rs | 1 + src/common/wal/src/config/kafka/standalone.rs | 1 + 8 files changed, 132 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84d28103f3..4591bba5c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,6 +2106,7 @@ version = "0.6.0" dependencies = [ "common-base", "common-telemetry", + "dns-lookup 2.0.4", "futures-util", "humantime-serde", "rskafka", @@ -3073,6 +3074,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "socket2 0.5.5", + "windows-sys 0.48.0", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -8370,7 +8383,7 @@ dependencies = [ "crossbeam-utils", "csv-core", "digest", - "dns-lookup", + "dns-lookup 1.0.8", "dyn-clone", "flate2", "gethostname", diff --git a/Cargo.toml b/Cargo.toml index 742681bb6a..98a5b1466e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" +dns-lookup = "2.0" etcd-client = "0.12" fst = "0.4.7" futures = "0.3" diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 3b84673bb1..ce10f40bd4 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] common-base.workspace = true common-telemetry.workspace = true +dns-lookup.workspace = true futures-util.workspace = true humantime-serde.workspace = true rskafka.workspace = true diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index a51335c199..4ae9594f09 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -147,6 +147,85 @@ mod tests { assert_eq!(datanode_wal_config, DatanodeWalConfig::RaftEngine(expected)); } + // TODO: find an way to merge with the test below. + #[test] + fn test_toml_kafka_with_dnslookup() { + let toml_str = r#" + provider = "kafka" + broker_endpoints = ["localhost:9092"] + num_topics = 32 + selector_type = "round_robin" + topic_name_prefix = "greptimedb_wal_topic" + replication_factor = 1 + create_topic_timeout = "30s" + max_batch_size = "1MB" + linger = "200ms" + consumer_wait_timeout = "100ms" + backoff_init = "500ms" + backoff_max = "10s" + backoff_base = 2 + backoff_deadline = "5mins" + "#; + + // Deserialized to MetaSrvWalConfig. + let metasrv_wal_config: MetaSrvWalConfig = toml::from_str(toml_str).unwrap(); + let expected = MetaSrvKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(metasrv_wal_config, MetaSrvWalConfig::Kafka(expected)); + + // Deserialized to DatanodeWalConfig. + let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); + let expected = DatanodeKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + compression: Compression::default(), + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); + + // Deserialized to StandaloneWalConfig. + let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap(); + let expected = StandaloneKafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + num_topics: 32, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 1, + create_topic_timeout: Duration::from_secs(30), + compression: Compression::default(), + max_batch_size: ReadableSize::mb(1), + linger: Duration::from_millis(200), + consumer_wait_timeout: Duration::from_millis(100), + backoff: BackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, + }; + assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected)); + } #[test] fn test_toml_kafka() { let toml_str = r#" diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ea708d9615..b48e6af453 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Serialize}; use serde_with::with_prefix; with_prefix!(pub backoff_prefix "backoff_"); @@ -46,3 +46,36 @@ impl Default for BackoffConfig { } } } +fn lookup_endpoint<'de, D>(endpoint: &str) -> Result +where + D: serde::Deserializer<'de>, +{ + let mut iter = endpoint.split(':'); + let ip_or_domain: &str = iter.next().unwrap(); + let port: &str = iter.next().unwrap(); + if ip_or_domain.parse::().is_ok() { + return Ok(endpoint.to_string()); + } + let ips: Vec<_> = dns_lookup::lookup_host(ip_or_domain) + .map_err(de::Error::custom)? + .into_iter() + .filter(|addr| addr.is_ipv4()) + .collect(); + if ips.is_empty() { + return Err(de::Error::custom(format!( + "failed to resolve the domain name: {}", + ip_or_domain + ))); + } + Ok(format!("{}:{}", ips[0], port)) +} +pub fn deserialize_broker_endpoints<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let mut broker_endpoints: Vec = Vec::deserialize(deserializer)?; + for endpoint in &mut broker_endpoints { + *endpoint = lookup_endpoint::(endpoint)?; + } + Ok(broker_endpoints) +} diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index b15d13dffc..dd35781973 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -26,6 +26,7 @@ use crate::BROKER_ENDPOINT; #[serde(default)] pub struct DatanodeKafkaConfig { /// The broker endpoints of the Kafka cluster. + #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// The compression algorithm used to compress kafka records. #[serde(skip)] diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index a8989275f4..5705dd71a6 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -24,6 +24,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; #[serde(default)] pub struct MetaSrvKafkaConfig { /// The broker endpoints of the Kafka cluster. + #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// The number of topics to be created upon start. pub num_topics: usize, diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs index 3da8fa4980..bd34b919d8 100644 --- a/src/common/wal/src/config/kafka/standalone.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -26,6 +26,7 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; #[serde(default)] pub struct StandaloneKafkaConfig { /// The broker endpoints of the Kafka cluster. + #[serde(deserialize_with = "super::common::deserialize_broker_endpoints")] pub broker_endpoints: Vec, /// Number of topics to be created upon start. pub num_topics: usize,