mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
refactor: remove backoff config (#5808)
* refactor: remove backoff config * chore: update config.md * fix: correct backoff config * chore: change deadline to 120s
This commit is contained in:
@@ -15,13 +15,13 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{error, info};
|
||||
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
|
||||
use common_wal::config::kafka::MetasrvKafkaConfig;
|
||||
use rskafka::client::error::Error as RsKafkaError;
|
||||
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
|
||||
use rskafka::client::partition::{Compression, UnknownTopicHandling};
|
||||
use rskafka::client::{Client, ClientBuilder};
|
||||
use rskafka::record::Record;
|
||||
use rskafka::BackoffConfig;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
@@ -135,16 +135,10 @@ impl KafkaTopicCreator {
|
||||
|
||||
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
|
||||
// Builds an kafka controller client for creating topics.
|
||||
let backoff_config = BackoffConfig {
|
||||
init_backoff: config.backoff.init,
|
||||
max_backoff: config.backoff.max,
|
||||
base: config.backoff.base as f64,
|
||||
deadline: config.backoff.deadline,
|
||||
};
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &config.connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
@@ -51,7 +51,6 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
|
||||
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
|
||||
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
|
||||
connection: config.connection,
|
||||
backoff: config.backoff,
|
||||
kafka_topic: config.kafka_topic,
|
||||
auto_create_topics: config.auto_create_topics,
|
||||
}),
|
||||
@@ -65,7 +64,6 @@ impl From<MetasrvWalConfig> for DatanodeWalConfig {
|
||||
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
|
||||
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
|
||||
connection: config.connection,
|
||||
backoff: config.backoff,
|
||||
kafka_topic: config.kafka_topic,
|
||||
..Default::default()
|
||||
}),
|
||||
@@ -84,7 +82,6 @@ mod tests {
|
||||
use tests::kafka::common::KafkaTopicConfig;
|
||||
|
||||
use super::*;
|
||||
use crate::config::kafka::common::BackoffConfig;
|
||||
use crate::config::{DatanodeKafkaConfig, MetasrvKafkaConfig};
|
||||
use crate::TopicSelectorType;
|
||||
|
||||
@@ -175,12 +172,6 @@ mod tests {
|
||||
client_key_path: None,
|
||||
}),
|
||||
},
|
||||
backoff: BackoffConfig {
|
||||
init: Duration::from_millis(500),
|
||||
max: Duration::from_secs(10),
|
||||
base: 2,
|
||||
deadline: Some(Duration::from_secs(60 * 5)),
|
||||
},
|
||||
kafka_topic: KafkaTopicConfig {
|
||||
num_topics: 32,
|
||||
selector_type: TopicSelectorType::RoundRobin,
|
||||
@@ -212,12 +203,6 @@ mod tests {
|
||||
},
|
||||
max_batch_bytes: ReadableSize::mb(1),
|
||||
consumer_wait_timeout: Duration::from_millis(100),
|
||||
backoff: BackoffConfig {
|
||||
init: Duration::from_millis(500),
|
||||
max: Duration::from_secs(10),
|
||||
base: 2,
|
||||
deadline: Some(Duration::from_secs(60 * 5)),
|
||||
},
|
||||
kafka_topic: KafkaTopicConfig {
|
||||
num_topics: 32,
|
||||
selector_type: TopicSelectorType::RoundRobin,
|
||||
|
||||
@@ -17,44 +17,22 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use rskafka::client::{Credentials, SaslConfig};
|
||||
use rskafka::BackoffConfig;
|
||||
use rustls::{ClientConfig, RootCertStore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::with_prefix;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
/// The default backoff config for kafka client.
|
||||
pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
|
||||
init_backoff: Duration::from_millis(100),
|
||||
max_backoff: Duration::from_secs(10),
|
||||
base: 2.0,
|
||||
deadline: Some(Duration::from_secs(120)),
|
||||
};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
|
||||
|
||||
with_prefix!(pub backoff_prefix "backoff_");
|
||||
|
||||
/// Backoff configurations for kafka client.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct BackoffConfig {
|
||||
/// The initial backoff delay.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub init: Duration,
|
||||
/// The maximum backoff delay.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub max: Duration,
|
||||
/// The exponential backoff rate, i.e. next backoff = base * current backoff.
|
||||
pub base: u32,
|
||||
/// The deadline of retries. `None` stands for no deadline.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub deadline: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for BackoffConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
init: Duration::from_millis(500),
|
||||
max: Duration::from_secs(10),
|
||||
base: 2,
|
||||
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The SASL configurations for kafka client.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct KafkaClientSasl {
|
||||
|
||||
@@ -18,7 +18,7 @@ use common_base::readable_size::ReadableSize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::common::KafkaConnectionConfig;
|
||||
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
|
||||
use crate::config::kafka::common::KafkaTopicConfig;
|
||||
|
||||
/// Kafka wal configurations for datanode.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
@@ -34,9 +34,6 @@ pub struct DatanodeKafkaConfig {
|
||||
/// The consumer wait timeout.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub consumer_wait_timeout: Duration,
|
||||
/// The backoff config.
|
||||
#[serde(flatten, with = "backoff_prefix")]
|
||||
pub backoff: BackoffConfig,
|
||||
/// The kafka topic config.
|
||||
#[serde(flatten)]
|
||||
pub kafka_topic: KafkaTopicConfig,
|
||||
@@ -57,7 +54,6 @@ impl Default for DatanodeKafkaConfig {
|
||||
// Warning: Kafka has a default limit of 1MB per message in a topic.
|
||||
max_batch_bytes: ReadableSize::mb(1),
|
||||
consumer_wait_timeout: Duration::from_millis(100),
|
||||
backoff: BackoffConfig::default(),
|
||||
kafka_topic: KafkaTopicConfig::default(),
|
||||
auto_create_topics: true,
|
||||
create_index: true,
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::common::KafkaConnectionConfig;
|
||||
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
|
||||
use crate::config::kafka::common::KafkaTopicConfig;
|
||||
|
||||
/// Kafka wal configurations for metasrv.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
@@ -24,9 +24,6 @@ pub struct MetasrvKafkaConfig {
|
||||
/// The kafka connection config.
|
||||
#[serde(flatten)]
|
||||
pub connection: KafkaConnectionConfig,
|
||||
/// The backoff config.
|
||||
#[serde(flatten, with = "backoff_prefix")]
|
||||
pub backoff: BackoffConfig,
|
||||
/// The kafka config.
|
||||
#[serde(flatten)]
|
||||
pub kafka_topic: KafkaTopicConfig,
|
||||
@@ -38,7 +35,6 @@ impl Default for MetasrvKafkaConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
connection: Default::default(),
|
||||
backoff: Default::default(),
|
||||
kafka_topic: Default::default(),
|
||||
auto_create_topics: true,
|
||||
}
|
||||
|
||||
@@ -15,10 +15,10 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
|
||||
use common_wal::config::kafka::DatanodeKafkaConfig;
|
||||
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
|
||||
use rskafka::client::ClientBuilder;
|
||||
use rskafka::BackoffConfig;
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::provider::KafkaProvider;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
@@ -73,16 +73,11 @@ impl ClientManager {
|
||||
global_index_collector: Option<GlobalIndexCollector>,
|
||||
) -> Result<Self> {
|
||||
// Sets backoff config for the top-level kafka client and all clients constructed by it.
|
||||
let backoff_config = BackoffConfig {
|
||||
init_backoff: config.backoff.init,
|
||||
max_backoff: config.backoff.max,
|
||||
base: config.backoff.base as f64,
|
||||
deadline: config.backoff.deadline,
|
||||
};
|
||||
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
|
||||
.await
|
||||
.context(ResolveKafkaEndpointSnafu)?;
|
||||
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
|
||||
let mut builder =
|
||||
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
|
||||
if let Some(sasl) = &config.connection.sasl {
|
||||
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user