refactor: Remove the StandaloneKafkaConfig struct (#4253)

* refactor: Remove the StandaloneKafkaConfig struct

* remove the redundant assignment

* remove rudundant struct

* simplify replication_factor

* add KafkaTopicConfig

* fix check

* fix check

* fix check

* add flatten with

* revert config.md

* fix test params

* fix test param

* fix missing params when provider is kafka

* remove unsed files

* remove with prefix

* fix doc

* fix test

* fix clippy
This commit is contained in:
irenjj
2024-07-12 16:17:18 +08:00
committed by GitHub
parent 05c7d3eb42
commit 9f2d53c3df
11 changed files with 150 additions and 140 deletions

View File

@@ -40,7 +40,7 @@ use common_telemetry::info;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::StandaloneWalConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
@@ -130,7 +130,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: StandaloneWalConfig,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
@@ -155,7 +155,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: StandaloneWalConfig::default(),
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
@@ -204,7 +204,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: cloned_opts.enable_telemetry,
wal: cloned_opts.wal.into(),
wal: cloned_opts.wal,
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,

View File

@@ -24,7 +24,7 @@ use common_grpc::channel_manager::{
use common_runtime::global::RuntimeOptions;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig;
use frontend::frontend::FrontendOptions;
@@ -206,7 +206,7 @@ fn test_load_standalone_example_config() {
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
..Default::default()

View File

@@ -123,6 +123,7 @@ pub fn prepare_wal_options(
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
@@ -160,9 +161,13 @@ mod tests {
.collect::<Vec<_>>();
// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;

View File

@@ -56,11 +56,11 @@ impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: MetasrvKafkaConfig, kv_backend: KvBackendRef) -> Self {
// Topics should be created.
let topics = (0..config.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix))
let topics = (0..config.kafka_topic.num_topics)
.map(|topic_id| format!("{}_{topic_id}", config.kafka_topic.topic_name_prefix))
.collect::<Vec<_>>();
let selector = match config.selector_type {
let selector = match config.kafka_topic.selector_type {
TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(),
};
@@ -76,7 +76,7 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
let num_topics = self.config.num_topics;
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
// Topics should be created.
@@ -185,9 +185,9 @@ impl TopicManager {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
self.config.kafka_topic.num_partitions,
self.config.kafka_topic.replication_factor,
self.config.kafka_topic.create_topic_timeout.as_millis() as i32,
)
.await
{
@@ -242,6 +242,7 @@ impl TopicManager {
#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
@@ -283,9 +284,13 @@ mod tests {
.collect::<Vec<_>>();
// Creates a topic manager.
let config = MetasrvKafkaConfig {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
kafka_topic,
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;

View File

@@ -17,7 +17,7 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
/// Wal configurations for metasrv.
@@ -43,80 +43,43 @@ impl Default for DatanodeWalConfig {
}
}
/// Wal configurations for standalone.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}
impl Default for StandaloneWalConfig {
fn default() -> Self {
Self::RaftEngine(RaftEngineConfig::default())
}
}
impl From<StandaloneWalConfig> for MetasrvWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
impl From<DatanodeWalConfig> for MetasrvWalConfig {
fn from(config: DatanodeWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(_) => Self::RaftEngine,
StandaloneWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
}),
}
}
}
impl From<MetasrvWalConfig> for StandaloneWalConfig {
impl From<MetasrvWalConfig> for DatanodeWalConfig {
fn from(config: MetasrvWalConfig) -> Self {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(StandaloneKafkaConfig {
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
num_topics: config.num_topics,
selector_type: config.selector_type,
topic_name_prefix: config.topic_name_prefix,
num_partitions: config.num_partitions,
replication_factor: config.replication_factor,
create_topic_timeout: config.create_topic_timeout,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
..Default::default()
}),
}
}
}
impl From<StandaloneWalConfig> for DatanodeWalConfig {
fn from(config: StandaloneWalConfig) -> Self {
match config {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
}),
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use tests::kafka::common::KafkaTopicConfig;
use super::*;
use crate::config::kafka::common::BackoffConfig;
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig, StandaloneKafkaConfig};
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::TopicSelectorType;
#[test]
@@ -168,11 +131,6 @@ mod tests {
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
replication_factor = 1
create_topic_timeout = "30s"
max_batch_bytes = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
@@ -180,24 +138,32 @@ mod tests {
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
num_topics = 32
num_partitions = 1
selector_type = "round_robin"
replication_factor = 1
create_topic_timeout = "30s"
topic_name_prefix = "greptimedb_wal_topic"
"#;
// Deserialized to MetasrvWalConfig.
let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetasrvKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
@@ -213,28 +179,15 @@ mod tests {
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
// Deserialized to StandaloneWalConfig.
let standalone_wal_config: StandaloneWalConfig = toml::from_str(toml_str).unwrap();
let expected = StandaloneKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
kafka_topic: KafkaTopicConfig {
num_topics: 32,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
};
assert_eq!(standalone_wal_config, StandaloneWalConfig::Kafka(expected));
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
}

View File

@@ -15,8 +15,6 @@
pub mod common;
pub mod datanode;
pub mod metasrv;
pub mod standalone;
pub use datanode::DatanodeKafkaConfig;
pub use metasrv::MetasrvKafkaConfig;
pub use standalone::StandaloneKafkaConfig;

View File

@@ -17,6 +17,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use crate::{TopicSelectorType, TOPIC_NAME_PREFIX};
with_prefix!(pub backoff_prefix "backoff_");
/// Backoff configurations for kafka clients.
@@ -46,3 +48,35 @@ impl Default for BackoffConfig {
}
}
}
/// Topic configurations for kafka clients.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaTopicConfig {
/// Number of topics to be created upon start.
pub num_topics: usize,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// The replication factor of each topic.
pub replication_factor: i16,
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// Topic name prefix.
pub topic_name_prefix: String,
}
impl Default for KafkaTopicConfig {
fn default() -> Self {
Self {
num_topics: 64,
num_partitions: 1,
selector_type: TopicSelectorType::RoundRobin,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
}
}
}

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
use crate::BROKER_ENDPOINT;
/// Kafka wal configurations for datanode.
@@ -36,6 +36,9 @@ pub struct DatanodeKafkaConfig {
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
}
impl Default for DatanodeKafkaConfig {
@@ -46,6 +49,7 @@ impl Default for DatanodeKafkaConfig {
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
}
}
}

View File

@@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
use crate::BROKER_ENDPOINT;
/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -25,37 +23,21 @@ use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
pub struct MetasrvKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// Topic name prefix.
pub topic_name_prefix: String,
/// The number of partitions per topic.
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
/// The kafka config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
}
impl Default for MetasrvKafkaConfig {
fn default() -> Self {
let broker_endpoints = vec![BROKER_ENDPOINT.to_string()];
let replication_factor = broker_endpoints.len() as i16;
Self {
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: TOPIC_NAME_PREFIX.to_string(),
num_partitions: 1,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
}
}
}

View File

@@ -21,6 +21,7 @@ use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use frontend::instance::Instance;
@@ -231,8 +232,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMoc
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,
kafka_topic: KafkaTopicConfig {
topic_name_prefix: test_name.to_string(),
num_topics: 3,
..Default::default()
},
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await;
@@ -261,8 +265,11 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,
kafka_topic: KafkaTopicConfig {
topic_name_prefix: test_name.to_string(),
num_topics: 3,
..Default::default()
},
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await;

View File

@@ -23,6 +23,7 @@ use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datatypes::prelude::ScalarVector;
@@ -118,8 +119,11 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
@@ -247,8 +251,11 @@ pub async fn test_metric_table_region_migration_by_sql(
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
@@ -369,8 +376,11 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
@@ -490,8 +500,11 @@ pub async fn test_region_migration_multiple_regions(
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
@@ -626,8 +639,11 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
@@ -757,8 +773,11 @@ pub async fn test_region_migration_incorrect_from_peer(
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
@@ -831,8 +850,11 @@ pub async fn test_region_migration_incorrect_region_id(
}))
.with_metasrv_wal_config(MetasrvWalConfig::Kafka(MetasrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
kafka_topic: KafkaTopicConfig {
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
},
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))