revert: lz4 compression (#4329)

* Revert "test: revert lz4 compression"

This reverts commit 180dda13fa.

* refactor: remove compression field
This commit is contained in:
Weny Xu
2024-07-10 12:24:40 +08:00
committed by GitHub
parent 33ed745049
commit 52a9a748a1
4 changed files with 1 additions and 15 deletions

View File

@@ -100,7 +100,6 @@ impl From<StandaloneWalConfig> for DatanodeWalConfig {
StandaloneWalConfig::RaftEngine(config) => Self::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
broker_endpoints: config.broker_endpoints,
compression: config.compression,
max_batch_bytes: config.max_batch_bytes,
consumer_wait_timeout: config.consumer_wait_timeout,
backoff: config.backoff,
@@ -114,7 +113,6 @@ mod tests {
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use super::*;
use crate::config::kafka::common::BackoffConfig;
@@ -207,7 +205,6 @@ mod tests {
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
@@ -229,7 +226,6 @@ mod tests {
num_partitions: 1,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {

View File

@@ -15,7 +15,6 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
@@ -27,9 +26,6 @@ use crate::BROKER_ENDPOINT;
pub struct DatanodeKafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
@@ -46,7 +42,6 @@ impl Default for DatanodeKafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),

View File

@@ -15,7 +15,6 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{backoff_prefix, BackoffConfig};
@@ -40,9 +39,6 @@ pub struct StandaloneKafkaConfig {
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The compression algorithm used to compress kafka records.
#[serde(skip)]
pub compression: Compression,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
@@ -67,7 +63,6 @@ impl Default for StandaloneKafkaConfig {
num_partitions: 1,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
compression: Compression::NoCompression,
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),

View File

@@ -98,7 +98,7 @@ impl ClientManager {
producer_channel_size: REQUEST_BATCH_SIZE * 2,
producer_request_batch_size: REQUEST_BATCH_SIZE,
flush_batch_size: config.max_batch_bytes.as_bytes() as usize,
compression: config.compression,
compression: Compression::Lz4,
})
}