diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml index 58cc188985..26e354f3d0 100644 --- a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml @@ -7,7 +7,8 @@ meta: provider = "kafka" broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"] num_topics = 3 - auto_prune_topic_records = true + auto_prune_interval = "30s" + trigger_flush_threshold = 100 [datanode] [datanode.client] @@ -22,6 +23,7 @@ datanode: provider = "kafka" broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"] linger = "2ms" + overwrite_entry_start_id = true frontend: configData: |- [runtime] diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index 6b2c9992f4..e4763687cd 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -30,10 +30,10 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig { deadline: Some(Duration::from_secs(120)), }; -/// Default interval for active WAL pruning. -pub const DEFAULT_ACTIVE_PRUNE_INTERVAL: Duration = Duration::ZERO; -/// Default limit for concurrent active pruning tasks. -pub const DEFAULT_ACTIVE_PRUNE_TASK_LIMIT: usize = 10; +/// Default interval for auto WAL pruning. +pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO; +/// Default limit for concurrent auto pruning tasks. +pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10; /// Default interval for sending flush request to regions when pruning remote WAL. pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0; diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index dd659d636e..278e3dd1a5 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -18,8 +18,8 @@ use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ - KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL, - DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD, + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL, + DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD, }; /// Kafka wal configurations for datanode. @@ -47,9 +47,8 @@ pub struct DatanodeKafkaConfig { pub dump_index_interval: Duration, /// Ignore missing entries during read WAL. pub overwrite_entry_start_id: bool, - // Active WAL pruning. - pub auto_prune_topic_records: bool, // Interval of WAL pruning. + #[serde(with = "humantime_serde")] pub auto_prune_interval: Duration, // Threshold for sending flush request when pruning remote WAL. // `None` stands for never sending flush request. @@ -70,10 +69,9 @@ impl Default for DatanodeKafkaConfig { create_index: true, dump_index_interval: Duration::from_secs(60), overwrite_entry_start_id: false, - auto_prune_topic_records: false, - auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL, + auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL, trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD, - auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, + auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM, } } } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index acbfbe05c6..d20100af89 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -17,8 +17,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use crate::config::kafka::common::{ - KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL, - DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD, + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL, + DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD, }; /// Kafka wal configurations for metasrv. @@ -34,6 +34,7 @@ pub struct MetasrvKafkaConfig { // Automatically create topics for WAL. pub auto_create_topics: bool, // Interval of WAL pruning. + #[serde(with = "humantime_serde")] pub auto_prune_interval: Duration, // Threshold for sending flush request when pruning remote WAL. // `None` stands for never sending flush request. @@ -48,9 +49,9 @@ impl Default for MetasrvKafkaConfig { connection: Default::default(), kafka_topic: Default::default(), auto_create_topics: true, - auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL, + auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL, trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD, - auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, + auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM, } } } diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 20987eed9a..a58d802262 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -15,6 +15,7 @@ sync_write = false provider = "kafka" broker_endpoints = {kafka_wal_broker_endpoints | unescaped} linger = "5ms" +overwrite_entry_start_id = true {{ endif }} [storage] diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index 3daf6150f3..f4bbbf3ae9 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -18,5 +18,6 @@ broker_endpoints = {kafka_wal_broker_endpoints | unescaped} num_topics = 3 selector_type = "round_robin" topic_name_prefix = "distributed_test_greptimedb_wal_topic" -auto_prune_topic_records = true +auto_prune_interval = "30s" +trigger_flush_threshold = 100 {{ endif }}