From bc398cf197faf0617d9e32fa4f4c071a25c8c5e6 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Sat, 6 Jul 2024 05:40:18 +0900 Subject: [PATCH] feat(remote wal): set default compresion to LZ4 (#4294) * feat(remote wal): set default compresion to LZ4 * fix: fix test --- .../meta/src/wal_options_allocator/kafka/topic_manager.rs | 2 +- src/common/wal/src/config.rs | 4 ++-- src/common/wal/src/config/kafka/datanode.rs | 2 +- src/common/wal/src/config/kafka/standalone.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index ab64a6fa0f..fb0130d0df 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -173,7 +173,7 @@ impl TopicManager { timestamp: chrono::Utc::now(), headers: Default::default(), }], - Compression::NoCompression, + Compression::Lz4, ) .await .context(ProduceRecordSnafu { topic })?; diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 3a1b87b86a..e01a99e40a 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -207,7 +207,7 @@ mod tests { let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap(); let expected = DatanodeKafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], - compression: Compression::default(), + compression: Compression::Lz4, max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig { @@ -229,7 +229,7 @@ mod tests { num_partitions: 1, replication_factor: 1, create_topic_timeout: Duration::from_secs(30), - compression: Compression::default(), + compression: Compression::Lz4, max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig { diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index b50ac68553..3ae47761a8 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -46,7 +46,7 @@ impl Default for DatanodeKafkaConfig { fn default() -> Self { Self { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], - compression: Compression::NoCompression, + compression: Compression::Lz4, // Warning: Kafka has a default limit of 1MB per message in a topic. max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100), diff --git a/src/common/wal/src/config/kafka/standalone.rs b/src/common/wal/src/config/kafka/standalone.rs index ddee160bf6..f334599fd9 100644 --- a/src/common/wal/src/config/kafka/standalone.rs +++ b/src/common/wal/src/config/kafka/standalone.rs @@ -67,7 +67,7 @@ impl Default for StandaloneKafkaConfig { num_partitions: 1, replication_factor, create_topic_timeout: Duration::from_secs(30), - compression: Compression::NoCompression, + compression: Compression::Lz4, // Warning: Kafka has a default limit of 1MB per message in a topic. max_batch_bytes: ReadableSize::mb(1), consumer_wait_timeout: Duration::from_millis(100),