diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index c83c141834..0b47c32ee2 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -100,7 +100,6 @@ impl From for DatanodeWalConfig { StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config), StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig { broker_endpoints: config.broker_endpoints, - compression: config.compression, max_batch_bytes: config.max_batch_bytes, consumer_wait_timeout: config.consumer_wait_timeout, backoff: config.backoff, @@ -114,7 +113,6 @@ mod tests { use std::time::Duration; use common_base::readable_size::ReadableSize; - use rskafka::client::partition::Compression; use super::*; use crate::config::kafka::common::BackoffConfig; @@ -207,7 +205,6 @@ mod tests { let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); let expected = DatanodeKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: Compression::NoCompression, max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig { @@ -229,7 +226,6 @@ mod tests { num_partitions: 1, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), - compression: Compression::NoCompression, max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig { diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index b50ac68553..ae97c1017c 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -15,7 +15,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; -use rskafka::client::partition::Compression; use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; @@ -27,9 +26,6 @@ use crate::BROKER_ENDPOINT; pub struct DatanodeKafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// The compression algorithm used to compress kafka records. - #[serde(skip)] - pub compression: Compression, /// TODO(weny): Remove the alias once we release v0.9. /// The max size of a single producer batch. #[serde(alias = "max_batch_size")] @@ -46,7 +42,6 @@ impl Default for DatanodeKafkaConfig { fn default() -> Self { Self { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], - compression: Compression::NoCompression, // Warning: Kafka has a default limit of 1MB per message in a topic. max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs index ddee160bf6..5a5e445f81 100644 --- a/src/common/wal/src/config/kafka/standalone.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -15,7 +15,6 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; -use rskafka::client::partition::Compression; use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{backoff_prefix, BackoffConfig}; @@ -40,9 +39,6 @@ pub struct StandaloneKafkaConfig { /// The timeout of topic creation. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, - /// The compression algorithm used to compress kafka records. - #[serde(skip)] - pub compression: Compression, /// TODO(weny): Remove the alias once we release v0.9. /// The max size of a single producer batch. #[serde(alias = "max_batch_size")] @@ -67,7 +63,6 @@ impl Default for StandaloneKafkaConfig { num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), - compression: Compression::NoCompression, // Warning: Kafka has a default limit of 1MB per message in a topic. max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 5f686e6ace..089f05f008 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -98,7 +98,7 @@ impl ClientManager { producer_channel_size: REQUEST_BATCH_SIZE * 2, producer_request_batch_size: REQUEST_BATCH_SIZE, flush_batch_size: config.max_batch_bytes.as_bytes() as usize, - compression: config.compression, + compression: Compression::Lz4, }) }