diff --git a/Cargo.lock b/Cargo.lock
index 4e146ae096..f84c13c753 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10870,7 +10870,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
-source = "git+https://github.com/WenyXu/rskafka.git?rev=bc582e98918def613a882581a1b9331d186d9b2d#bc582e98918def613a882581a1b9331d186d9b2d"
+source = "git+https://github.com/influxdata/rskafka.git?rev=a62120b6c74d68953464b256f858dc1c41a903b4#a62120b6c74d68953464b256f858dc1c41a903b4"
dependencies = [
"bytes",
"chrono",
diff --git a/Cargo.toml b/Cargo.toml
index 071659173d..47ae880b8d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -188,7 +188,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
-rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "bc582e98918def613a882581a1b9331d186d9b2d", features = [
+rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "a62120b6c74d68953464b256f858dc1c41a903b4", features = [
"transport-tls",
] }
rstest = "0.25"
diff --git a/config/config.md b/config/config.md
index 075378d025..9b612e902c 100644
--- a/config/config.md
+++ b/config/config.md
@@ -375,16 +375,16 @@
| `datanode.client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
-| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
-| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL. Set to `true` to automatically create topics for WAL. Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
-| `wal.auto_prune_interval` | String | `0s` | Interval of automatically WAL pruning. Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. |
-| `wal.trigger_flush_threshold` | Integer | `0` | The threshold to trigger a flush operation of a region in automatically WAL pruning. Metasrv will send a flush request to flush the region when: `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id` where: - `prunable_entry_id` is the maximum entry id that can be pruned of the region. - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic. Set to `0` to disable the flush operation. |
-| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. |
-| `wal.num_topics` | Integer | `64` | Number of topics. |
-| `wal.selector_type` | String | `round_robin` | Topic selector type. Available selector types: - `round_robin` (default) |
-| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. Only accepts strings that match the following regular expression pattern: [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]* i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
-| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
-| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
+| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster.
**It's only used when the provider is `kafka`**. |
+| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL. Set to `true` to automatically create topics for WAL. Otherwise, use topics named `topic_name_prefix_[0..num_topics)` **It's only used when the provider is `kafka`**. |
+| `wal.auto_prune_interval` | String | `10m` | Interval of automatically WAL pruning. Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. **It's only used when the provider is `kafka`**. |
+| `wal.flush_trigger_size` | String | `512MB` | Estimated size threshold to trigger a flush when using Kafka remote WAL. Since multiple regions may share a Kafka topic, the estimated size is calculated as: (latest_entry_id - flushed_entry_id) * avg_record_size MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`. - `latest_entry_id`: The latest entry ID in the topic. - `flushed_entry_id`: The last flushed entry ID for the region. Set to "0" to let the system decide the flush trigger size. **It's only used when the provider is `kafka`**. |
+| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. **It's only used when the provider is `kafka`**. |
+| `wal.num_topics` | Integer | `64` | Number of topics used for remote WAL. **It's only used when the provider is `kafka`**. |
+| `wal.selector_type` | String | `round_robin` | Topic selector type. Available selector types: - `round_robin` (default) **It's only used when the provider is `kafka`**. |
+| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. Only accepts strings that match the following regular expression pattern: [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]* i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. **It's only used when the provider is `kafka`**. |
+| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. **It's only used when the provider is `kafka`**. |
+| `wal.create_topic_timeout` | String | `30s` | The timeout for creating a Kafka topic. **It's only used when the provider is `kafka`**. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
| `logging` | -- | -- | The logging options. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index ff3568da0b..261c47f5e8 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -176,50 +176,61 @@ tcp_nodelay = true
# - `kafka`: metasrv **have to be** configured with kafka wal config when using kafka wal provider in datanode.
provider = "raft_engine"
-# Kafka wal config.
-
## The broker endpoints of the Kafka cluster.
+##
+## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
+## **It's only used when the provider is `kafka`**.
auto_create_topics = true
## Interval of automatically WAL pruning.
## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
-auto_prune_interval = "0s"
+## **It's only used when the provider is `kafka`**.
+auto_prune_interval = "10m"
-## The threshold to trigger a flush operation of a region in automatically WAL pruning.
-## Metasrv will send a flush request to flush the region when:
-## `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
-## where:
-## - `prunable_entry_id` is the maximum entry id that can be pruned of the region.
-## - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
-## Set to `0` to disable the flush operation.
-trigger_flush_threshold = 0
+
+## Estimated size threshold to trigger a flush when using Kafka remote WAL.
+## Since multiple regions may share a Kafka topic, the estimated size is calculated as:
+## (latest_entry_id - flushed_entry_id) * avg_record_size
+## MetaSrv triggers a flush for a region when this estimated size exceeds `flush_trigger_size`.
+## - `latest_entry_id`: The latest entry ID in the topic.
+## - `flushed_entry_id`: The last flushed entry ID for the region.
+## Set to "0" to let the system decide the flush trigger size.
+## **It's only used when the provider is `kafka`**.
+flush_trigger_size = "512MB"
## Concurrent task limit for automatically WAL pruning.
+## **It's only used when the provider is `kafka`**.
auto_prune_parallelism = 10
-## Number of topics.
+## Number of topics used for remote WAL.
+## **It's only used when the provider is `kafka`**.
num_topics = 64
## Topic selector type.
## Available selector types:
## - `round_robin` (default)
+## **It's only used when the provider is `kafka`**.
selector_type = "round_robin"
+
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## Only accepts strings that match the following regular expression pattern:
## [a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
+## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"
## Expected number of replicas of each partition.
+## **It's only used when the provider is `kafka`**.
replication_factor = 1
-## Above which a topic creation operation will be cancelled.
+## The timeout for creating a Kafka topic.
+## **It's only used when the provider is `kafka`**.
create_topic_timeout = "30s"
# The Kafka SASL configuration.
diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs
index 4beb24c008..83834937dc 100644
--- a/src/common/meta/src/region_registry.rs
+++ b/src/common/meta/src/region_registry.rs
@@ -132,6 +132,32 @@ impl LeaderRegionManifestInfo {
}
}
}
+
+ /// A region is considered inactive if the flushed entry id is less than the topic's latest entry id.
+ ///
+ /// The `topic_latest_entry_id` of a region is updated only when its memtable is empty during a flush.
+ /// This means that within the range `[flushed_entry_id, topic_latest_entry_id]`,
+ /// there is no data written to the memtable.
+ /// Therefore, such a region can be considered inactive.
+ pub fn is_inactive(&self) -> bool {
+ match *self {
+ LeaderRegionManifestInfo::Mito {
+ flushed_entry_id,
+ topic_latest_entry_id,
+ ..
+ } => flushed_entry_id < topic_latest_entry_id,
+ LeaderRegionManifestInfo::Metric {
+ data_flushed_entry_id,
+ data_topic_latest_entry_id,
+ metadata_flushed_entry_id,
+ metadata_topic_latest_entry_id,
+ ..
+ } => {
+ data_flushed_entry_id < data_topic_latest_entry_id
+ || metadata_flushed_entry_id < metadata_topic_latest_entry_id
+ }
+ }
+ }
}
pub type LeaderRegionRegistryRef = Arc;
@@ -206,3 +232,97 @@ impl LeaderRegionRegistry {
inner.clear();
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn mito_manifest_info(
+ flushed_entry_id: u64,
+ topic_latest_entry_id: u64,
+ ) -> LeaderRegionManifestInfo {
+ LeaderRegionManifestInfo::Mito {
+ flushed_entry_id,
+ topic_latest_entry_id,
+ manifest_version: 1,
+ }
+ }
+
+ fn metric_manifest_info(
+ data_flushed_entry_id: u64,
+ data_topic_latest_entry_id: u64,
+ metadata_flushed_entry_id: u64,
+ metadata_topic_latest_entry_id: u64,
+ ) -> LeaderRegionManifestInfo {
+ LeaderRegionManifestInfo::Metric {
+ data_flushed_entry_id,
+ data_topic_latest_entry_id,
+ metadata_flushed_entry_id,
+ metadata_topic_latest_entry_id,
+ data_manifest_version: 1,
+ metadata_manifest_version: 1,
+ }
+ }
+
+ #[test]
+ fn test_is_inactive_mito() {
+ // inactive: flushed_entry_id < topic_latest_entry_id
+ let info = mito_manifest_info(10, 20);
+ assert!(info.is_inactive());
+ // active: flushed_entry_id == topic_latest_entry_id
+ let info = mito_manifest_info(20, 20);
+ assert!(!info.is_inactive());
+ // active: flushed_entry_id > topic_latest_entry_id
+ let info = mito_manifest_info(30, 20);
+ assert!(!info.is_inactive());
+ }
+
+ #[test]
+ fn test_is_inactive_metric() {
+ // inactive: data_flushed_entry_id < data_topic_latest_entry_id
+ let info = metric_manifest_info(5, 10, 20, 20);
+ assert!(info.is_inactive());
+ // inactive: metadata_flushed_entry_id < metadata_topic_latest_entry_id
+ let info = metric_manifest_info(10, 10, 15, 20);
+ assert!(info.is_inactive());
+ // inactive: both are less
+ let info = metric_manifest_info(1, 2, 3, 4);
+ assert!(info.is_inactive());
+ // active: both are equal
+ let info = metric_manifest_info(10, 10, 20, 20);
+ assert!(!info.is_inactive());
+ // active: both are greater
+ let info = metric_manifest_info(30, 20, 40, 20);
+ assert!(!info.is_inactive());
+ }
+
+ #[test]
+ fn test_prunable_entry_id_mito() {
+ let info = mito_manifest_info(100, 120);
+ // max(100, 120) = 120
+ assert_eq!(info.prunable_entry_id(), 120);
+
+ let info = mito_manifest_info(150, 120);
+ // max(150, 120) = 150
+ assert_eq!(info.prunable_entry_id(), 150);
+
+ let info = mito_manifest_info(0, 0);
+ assert_eq!(info.prunable_entry_id(), 0);
+ }
+
+ #[test]
+ fn test_prunable_entry_id_metric() {
+ let info = metric_manifest_info(100, 120, 90, 110);
+ // data_prunable = max(100,120)=120
+ // metadata_prunable = max(90,110)=110
+ // min(120,110)=110
+ assert_eq!(info.prunable_entry_id(), 110);
+ let info = metric_manifest_info(200, 150, 180, 220);
+ // data_prunable = max(200,150)=200
+ // metadata_prunable = max(180,220)=220
+ // min(200,220)=200
+ assert_eq!(info.prunable_entry_id(), 200);
+ let info = metric_manifest_info(0, 0, 0, 0);
+ assert_eq!(info.prunable_entry_id(), 0);
+ }
+}
diff --git a/src/common/meta/src/stats/topic.rs b/src/common/meta/src/stats/topic.rs
index e5a87f3332..3e0415fb6b 100644
--- a/src/common/meta/src/stats/topic.rs
+++ b/src/common/meta/src/stats/topic.rs
@@ -291,7 +291,8 @@ impl TopicStatsStore {
// Safety: The current topic stats is initialized in the previous step.
let active_bucket = self.active_bucket.as_mut().unwrap();
- debug_assert!(active_bucket.add_stat(datanode_id, stat, millis_ts));
+ let added = active_bucket.add_stat(datanode_id, stat, millis_ts);
+ debug_assert!(added);
}
/// Gets the calculated topic stat for a given topic.
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index b60e1519e7..4bfcbc46e7 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -19,6 +19,9 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
+use crate::config::kafka::common::{
+ DEFAULT_AUTO_PRUNE_INTERVAL, DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
+};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
@@ -55,9 +58,12 @@ impl From for MetasrvWalConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
- auto_prune_interval: config.auto_prune_interval,
- trigger_flush_threshold: config.trigger_flush_threshold,
- auto_prune_parallelism: config.auto_prune_parallelism,
+ // This field won't be used in standalone mode
+ auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
+ // This field won't be used in standalone mode
+ auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
+ // This field won't be used in standalone mode
+ flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
}),
}
}
@@ -200,8 +206,8 @@ mod tests {
},
auto_create_topics: true,
auto_prune_interval: Duration::from_secs(0),
- trigger_flush_threshold: 0,
auto_prune_parallelism: 10,
+ flush_trigger_size: ReadableSize::mb(512),
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs
index 41f9a379db..56145b06c9 100644
--- a/src/common/wal/src/config/kafka/common.rs
+++ b/src/common/wal/src/config/kafka/common.rs
@@ -16,6 +16,7 @@ use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
+use common_base::readable_size::ReadableSize;
use rskafka::client::{Credentials, SaslConfig};
use rskafka::BackoffConfig;
use rustls::{ClientConfig, RootCertStore};
@@ -39,8 +40,8 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
/// Default limit for concurrent auto pruning tasks.
pub const DEFAULT_AUTO_PRUNE_PARALLELISM: usize = 10;
-/// Default interval for sending flush request to regions when pruning remote WAL.
-pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0;
+/// Default size of WAL to trigger flush.
+pub const DEFAULT_FLUSH_TRIGGER_SIZE: ReadableSize = ReadableSize::mb(512);
use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs
index 278e3dd1a5..77cf05397d 100644
--- a/src/common/wal/src/config/kafka/datanode.rs
+++ b/src/common/wal/src/config/kafka/datanode.rs
@@ -17,10 +17,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
-use crate::config::kafka::common::{
- KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
- DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
-};
+use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -47,14 +44,6 @@ pub struct DatanodeKafkaConfig {
pub dump_index_interval: Duration,
/// Ignore missing entries during read WAL.
pub overwrite_entry_start_id: bool,
- // Interval of WAL pruning.
- #[serde(with = "humantime_serde")]
- pub auto_prune_interval: Duration,
- // Threshold for sending flush request when pruning remote WAL.
- // `None` stands for never sending flush request.
- pub trigger_flush_threshold: u64,
- // Limit of concurrent active pruning procedures.
- pub auto_prune_parallelism: usize,
}
impl Default for DatanodeKafkaConfig {
@@ -69,9 +58,6 @@ impl Default for DatanodeKafkaConfig {
create_index: true,
dump_index_interval: Duration::from_secs(60),
overwrite_entry_start_id: false,
- auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
- trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
- auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
}
}
}
diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs
index d20100af89..4ed23e5a3b 100644
--- a/src/common/wal/src/config/kafka/metasrv.rs
+++ b/src/common/wal/src/config/kafka/metasrv.rs
@@ -14,11 +14,12 @@
use std::time::Duration;
+use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_AUTO_PRUNE_INTERVAL,
- DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
+ DEFAULT_AUTO_PRUNE_PARALLELISM, DEFAULT_FLUSH_TRIGGER_SIZE,
};
/// Kafka wal configurations for metasrv.
@@ -36,11 +37,10 @@ pub struct MetasrvKafkaConfig {
// Interval of WAL pruning.
#[serde(with = "humantime_serde")]
pub auto_prune_interval: Duration,
- // Threshold for sending flush request when pruning remote WAL.
- // `None` stands for never sending flush request.
- pub trigger_flush_threshold: u64,
// Limit of concurrent active pruning procedures.
pub auto_prune_parallelism: usize,
+ // The size of WAL to trigger flush.
+ pub flush_trigger_size: ReadableSize,
}
impl Default for MetasrvKafkaConfig {
@@ -50,8 +50,8 @@ impl Default for MetasrvKafkaConfig {
kafka_topic: Default::default(),
auto_create_topics: true,
auto_prune_interval: DEFAULT_AUTO_PRUNE_INTERVAL,
- trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
auto_prune_parallelism: DEFAULT_AUTO_PRUNE_PARALLELISM,
+ flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
}
}
}
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 9b5853bce3..3c94bc9f3c 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -16,7 +16,7 @@
use std::path::Path;
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, Instant};
use common_base::Plugins;
use common_error::ext::BoxedError;
@@ -670,6 +670,7 @@ async fn open_all_regions(
));
}
+ let now = Instant::now();
let open_regions = region_server
.handle_batch_open_requests(
init_regions_parallelism,
@@ -677,6 +678,11 @@ async fn open_all_regions(
ignore_nonexistent_region,
)
.await?;
+ info!(
+ "Opened {} regions in {:?}",
+ open_regions.len(),
+ now.elapsed()
+ );
if !ignore_nonexistent_region {
ensure!(
open_regions.len() == num_regions,
@@ -708,6 +714,8 @@ async fn open_all_regions(
#[cfg(feature = "enterprise")]
if !follower_regions.is_empty() {
+ use tokio::time::Instant;
+
info!(
"going to open {} follower region(s)",
follower_regions.len()
@@ -728,6 +736,7 @@ async fn open_all_regions(
));
}
+ let now = Instant::now();
let open_regions = region_server
.handle_batch_open_requests(
init_regions_parallelism,
@@ -735,6 +744,11 @@ async fn open_all_regions(
ignore_nonexistent_region,
)
.await?;
+ info!(
+ "Opened {} follower regions in {:?}",
+ open_regions.len(),
+ now.elapsed()
+ );
if !ignore_nonexistent_region {
ensure!(
diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs
index 1fb09a737d..9095545bcd 100644
--- a/src/datanode/src/heartbeat/handler.rs
+++ b/src/datanode/src/heartbeat/handler.rs
@@ -119,6 +119,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
+ | Some((_, Instruction::FlushRegions { .. }))
)
}
diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs
index eec28fc665..09617aa37b 100644
--- a/src/datanode/src/heartbeat/handler/flush_region.rs
+++ b/src/datanode/src/heartbeat/handler/flush_region.rs
@@ -15,7 +15,7 @@
use std::time::Instant;
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
-use common_telemetry::{info, warn};
+use common_telemetry::{debug, warn};
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
@@ -34,30 +34,22 @@ impl HandlerContext {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
- let now = Instant::now();
let result = self.region_server.handle_request(*region_id, request).await;
- let elapsed = now.elapsed();
- info!("Flush region: {}, elapsed: {:?}", region_id, elapsed);
-
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!(
- "Received a flush region instruction from meta, but target region: {} is not found., elapsed: {:?}",
- region_id,
- elapsed
+ "Received a flush region instruction from meta, but target region: {} is not found.",
+ region_id
);
}
Err(err) => {
- warn!(
- "Failed to flush region: {}, error: {}, elapsed: {:?}",
- region_id, err, elapsed
- );
+ warn!("Failed to flush region: {}, error: {}", region_id, err);
}
}
}
let elapsed = start_time.elapsed();
- info!(
+ debug!(
"Flush regions: {:?}, elapsed: {:?}",
flush_regions.region_ids, elapsed
);
diff --git a/src/log-store/src/kafka/producer.rs b/src/log-store/src/kafka/producer.rs
index 4d24d9a0d9..121bc5ef9c 100644
--- a/src/log-store/src/kafka/producer.rs
+++ b/src/log-store/src/kafka/producer.rs
@@ -17,7 +17,6 @@ use std::sync::Arc;
use common_telemetry::warn;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, OffsetAt, PartitionClient};
-use rskafka::client::producer::ProduceResult;
use rskafka::record::Record;
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
@@ -112,6 +111,11 @@ impl OrderedBatchProducer {
}
}
+pub struct ProduceResult {
+ pub offsets: Vec,
+ pub encoded_request_size: usize,
+}
+
#[async_trait::async_trait]
pub trait ProducerClient: std::fmt::Debug + Send + Sync {
async fn produce(
@@ -144,7 +148,10 @@ impl ProducerClient for PartitionClient {
.with_label_values(&[self.topic(), &partition])
.inc();
- Ok(result)
+ Ok(ProduceResult {
+ offsets: result.offsets,
+ encoded_request_size: result.encoded_request_size,
+ })
}
async fn get_offset(&self, at: OffsetAt) -> rskafka::client::error::Result {
diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs
index d391468750..b39c813b39 100644
--- a/src/meta-srv/src/error.rs
+++ b/src/meta-srv/src/error.rs
@@ -927,6 +927,15 @@ pub enum Error {
offset: u64,
},
+ #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
+ GetOffset {
+ topic: String,
+ #[snafu(implicit)]
+ location: Location,
+ #[snafu(source)]
+ error: rskafka::client::error::Error,
+ },
+
#[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
UpdateTopicNameValue {
topic: String,
@@ -981,6 +990,7 @@ impl ErrorExt for Error {
| Error::BuildKafkaClient { .. } => StatusCode::Internal,
Error::DeleteRecords { .. }
+ | Error::GetOffset { .. }
| Error::PeerUnavailable { .. }
| Error::PusherNotFound { .. } => StatusCode::Unexpected,
Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs
index 678971556d..8b78c2aa89 100644
--- a/src/meta-srv/src/lib.rs
+++ b/src/meta-srv/src/lib.rs
@@ -40,6 +40,7 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;
+pub mod utils;
pub use crate::error::Result;
diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs
index 081a86dbc0..9a4d97e1cc 100644
--- a/src/meta-srv/src/metasrv.rs
+++ b/src/meta-srv/src/metasrv.rs
@@ -72,6 +72,7 @@ use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
+use crate::region::flush_trigger::RegionFlushTickerRef;
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
@@ -462,6 +463,7 @@ pub struct Metasrv {
leader_region_registry: LeaderRegionRegistryRef,
topic_stats_registry: TopicStatsRegistryRef,
wal_prune_ticker: Option,
+ region_flush_ticker: Option,
table_id_sequence: SequenceRef,
reconciliation_manager: ReconciliationManagerRef,
@@ -521,6 +523,9 @@ impl Metasrv {
if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
}
+ if let Some(region_flush_trigger) = &self.region_flush_ticker {
+ leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
+ }
if let Some(customizer) = self.plugins.get::() {
customizer.customize(&mut leadership_change_notifier);
}
diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs
index 68e9af3dba..89c064e71c 100644
--- a/src/meta-srv/src/metasrv/builder.rs
+++ b/src/meta-srv/src/metasrv/builder.rs
@@ -66,6 +66,7 @@ use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
+use crate::region::flush_trigger::RegionFlushTrigger;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
@@ -398,6 +399,23 @@ impl MetasrvBuilder {
};
let ddl_manager = Arc::new(ddl_manager);
+ let region_flush_ticker = if is_remote_wal {
+ let remote_wal_options = options.wal.remote_wal_options().unwrap();
+ let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
+ table_metadata_manager.clone(),
+ leader_region_registry.clone(),
+ topic_stats_registry.clone(),
+ mailbox.clone(),
+ options.grpc.server_addr.clone(),
+ remote_wal_options.flush_trigger_size,
+ );
+ region_flush_trigger.try_start()?;
+
+ Some(Arc::new(region_flush_ticker))
+ } else {
+ None
+ };
+
// remote WAL prune ticker and manager
let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
let (tx, rx) = WalPruneManager::channel();
@@ -410,8 +428,6 @@ impl MetasrvBuilder {
client: Arc::new(kafka_client),
table_metadata_manager: table_metadata_manager.clone(),
leader_region_registry: leader_region_registry.clone(),
- server_addr: options.grpc.server_addr.clone(),
- mailbox: mailbox.clone(),
};
let wal_prune_manager = WalPruneManager::new(
table_metadata_manager.clone(),
@@ -419,7 +435,6 @@ impl MetasrvBuilder {
rx,
procedure_manager.clone(),
wal_prune_context,
- remote_wal_options.trigger_flush_threshold,
);
// Start manager in background. Ticker will be started in the main thread to send ticks.
wal_prune_manager.try_start().await?;
@@ -507,6 +522,7 @@ impl MetasrvBuilder {
cache_invalidator,
leader_region_registry,
wal_prune_ticker,
+ region_flush_ticker,
table_id_sequence,
reconciliation_manager,
topic_stats_registry,
diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs
index 2984a91a1c..dd893eb234 100644
--- a/src/meta-srv/src/metrics.rs
+++ b/src/meta-srv/src/metrics.rs
@@ -80,4 +80,7 @@ lazy_static! {
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();
+ /// The triggered region flush total counter.
+ pub static ref METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL: IntCounterVec =
+ register_int_counter_vec!("meta_triggered_region_flushes_total", "meta triggered region flush total", &["topic_name", "region_type"]).unwrap();
}
diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs
index 1189f1a02b..17475b4b41 100644
--- a/src/meta-srv/src/procedure/test_util.rs
+++ b/src/meta-srv/src/procedure/test_util.rs
@@ -192,9 +192,8 @@ pub async fn new_wal_prune_metadata(
n_region: u32,
n_table: u32,
offsets: &[i64],
- threshold: u64,
topic: String,
-) -> (EntryId, Vec) {
+) -> EntryId {
let datanode_id = 1;
let from_peer = Peer::empty(datanode_id);
let mut min_prunable_entry_id = u64::MAX;
@@ -251,17 +250,7 @@ pub async fn new_wal_prune_metadata(
.unwrap();
}
- let regions_to_flush = region_entry_ids
- .iter()
- .filter_map(|(region_id, prunable_entry_id)| {
- if max_prunable_entry_id - prunable_entry_id > threshold {
- Some(*region_id)
- } else {
- None
- }
- })
- .collect::>();
- (min_prunable_entry_id, regions_to_flush)
+ min_prunable_entry_id
}
pub async fn update_in_memory_region_flushed_entry_id(
diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs
index ff16f3db06..37fede668e 100644
--- a/src/meta-srv/src/procedure/wal_prune.rs
+++ b/src/meta-srv/src/procedure/wal_prune.rs
@@ -16,16 +16,13 @@ pub(crate) mod manager;
#[cfg(test)]
mod test_util;
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
-use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
-use common_meta::instruction::{FlushRegions, Instruction};
use common_meta::key::TableMetadataManagerRef;
use common_meta::lock_key::RemoteWalLock;
-use common_meta::peer::Peer;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
@@ -36,7 +33,7 @@ use common_telemetry::{info, warn};
use itertools::{Itertools, MinMaxResult};
use log_store::kafka::DEFAULT_PARTITION;
use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
-use rskafka::client::partition::UnknownTopicHandling;
+use rskafka::client::partition::{OffsetAt, UnknownTopicHandling};
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -47,7 +44,6 @@ use crate::error::{
self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu,
UpdateTopicNameValueSnafu,
};
-use crate::service::mailbox::{Channel, MailboxRef};
use crate::Result;
pub type KafkaClientRef = Arc;
@@ -58,7 +54,6 @@ const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug, Serialize, Deserialize)]
pub enum WalPruneState {
Prepare,
- FlushRegion,
Prune,
}
@@ -70,10 +65,6 @@ pub struct Context {
pub table_metadata_manager: TableMetadataManagerRef,
/// The leader region registry.
pub leader_region_registry: LeaderRegionRegistryRef,
- /// Server address of metasrv.
- pub server_addr: String,
- /// The mailbox to send messages.
- pub mailbox: MailboxRef,
}
/// The data of WAL pruning.
@@ -83,10 +74,6 @@ pub struct WalPruneData {
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
pub prunable_entry_id: EntryId,
- pub regions_to_flush: Vec,
- /// If `prunable_entry_id` + `trigger_flush_threshold` < `max_prunable_entry_id`, send a flush request to the region.
- /// If `None`, never send flush requests.
- pub trigger_flush_threshold: u64,
/// The state.
pub state: WalPruneState,
}
@@ -101,18 +88,11 @@ pub struct WalPruneProcedure {
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
- pub fn new(
- topic: String,
- context: Context,
- trigger_flush_threshold: u64,
- guard: Option,
- ) -> Self {
+ pub fn new(topic: String, context: Context, guard: Option) -> Self {
Self {
data: WalPruneData {
topic,
prunable_entry_id: 0,
- trigger_flush_threshold,
- regions_to_flush: vec![],
state: WalPruneState::Prepare,
},
context,
@@ -134,60 +114,6 @@ impl WalPruneProcedure {
})
}
- async fn build_peer_to_region_ids_map(
- &self,
- ctx: &Context,
- region_ids: &[RegionId],
- ) -> Result>> {
- let table_ids = region_ids
- .iter()
- .map(|region_id| region_id.table_id())
- .collect::>()
- .into_iter()
- .collect::>();
- let table_ids_table_routes_map = ctx
- .table_metadata_manager
- .table_route_manager()
- .batch_get_physical_table_routes(&table_ids)
- .await
- .context(TableMetadataManagerSnafu)?;
-
- let mut peer_region_ids_map = HashMap::new();
- for region_id in region_ids {
- let table_id = region_id.table_id();
- let table_route = match table_ids_table_routes_map.get(&table_id) {
- Some(route) => route,
- None => return error::TableRouteNotFoundSnafu { table_id }.fail(),
- };
- for region_route in &table_route.region_routes {
- if region_route.region.id != *region_id {
- continue;
- }
- if let Some(peer) = ®ion_route.leader_peer {
- peer_region_ids_map
- .entry(peer.clone())
- .or_insert_with(Vec::new)
- .push(*region_id);
- }
- }
- }
- Ok(peer_region_ids_map)
- }
-
- fn build_flush_region_instruction(
- &self,
- peer_region_ids_map: HashMap>,
- ) -> Result> {
- let peer_and_instructions = peer_region_ids_map
- .into_iter()
- .map(|(peer, region_ids)| {
- let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
- (peer.clone(), flush_instruction)
- })
- .collect();
- Ok(peer_and_instructions)
- }
-
/// Prepare the entry id to prune and regions to flush.
///
/// Retry:
@@ -211,10 +137,6 @@ impl WalPruneProcedure {
.into_iter()
.map(|(region_id, region)| {
let prunable_entry_id = region.manifest.prunable_entry_id();
- info!(
- "Region {}, topic: {}, prunable_entry_id: {}",
- region_id, self.data.topic, prunable_entry_id
- );
(region_id, prunable_entry_id)
})
.collect();
@@ -226,73 +148,24 @@ impl WalPruneProcedure {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
- topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
+ topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
return Ok(Status::done());
}
let min_max_result = prunable_entry_ids_map.values().minmax();
- let max_prunable_entry_id = match min_max_result {
+ match min_max_result {
MinMaxResult::NoElements => {
return Ok(Status::done());
}
MinMaxResult::OneElement(prunable_entry_id) => {
self.data.prunable_entry_id = *prunable_entry_id;
- *prunable_entry_id
}
- MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => {
+ MinMaxResult::MinMax(min_prunable_entry_id, _) => {
self.data.prunable_entry_id = *min_prunable_entry_id;
- *max_prunable_entry_id
}
};
- if self.data.trigger_flush_threshold != 0 {
- for (region_id, prunable_entry_id) in prunable_entry_ids_map {
- if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id {
- self.data.regions_to_flush.push(region_id);
- }
- }
- info!(
- "Flush regions: {:?}, trigger_flush_threshold: {}, prunable_entry_id: {}, max_prunable_entry_id: {}",
- self.data.regions_to_flush,
- self.data.trigger_flush_threshold,
- self.data.prunable_entry_id,
- max_prunable_entry_id
- );
- self.data.state = WalPruneState::FlushRegion;
- } else {
- self.data.state = WalPruneState::Prune;
- }
- Ok(Status::executing(true))
- }
-
- /// Send the flush request to regions with low flush entry id.
- ///
- /// Retry:
- /// - Failed to build peer to region ids map. It means failure in retrieving metadata.
- pub async fn on_sending_flush_request(&mut self) -> Result {
- let peer_to_region_ids_map = self
- .build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
- .await
- .map_err(BoxedError::new)
- .with_context(|_| error::RetryLaterWithSourceSnafu {
- reason: "Failed to build peer to region ids map",
- })?;
- let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
- for (peer, flush_instruction) in flush_instructions.into_iter() {
- let msg = MailboxMessage::json_message(
- &format!("Flush regions: {}", flush_instruction),
- &format!("Metasrv@{}", self.context.server_addr),
- &format!("Datanode-{}@{}", peer.id, peer.addr),
- common_time::util::current_time_millis(),
- &flush_instruction,
- )
- .with_context(|_| error::SerializeToJsonSnafu {
- input: flush_instruction.to_string(),
- })?;
- let ch = Channel::Datanode(peer.id);
- self.context.mailbox.send_oneway(&ch, msg).await?;
- }
self.data.state = WalPruneState::Prune;
- Ok(Status::executing(true))
+ Ok(Status::executing(false))
}
/// Prune the WAL and persist the minimum prunable entry id.
@@ -301,7 +174,6 @@ impl WalPruneProcedure {
/// - Failed to update the minimum prunable entry id in kvbackend.
/// - Failed to delete records.
pub async fn on_prune(&mut self) -> Result {
- // Safety: `prunable_entry_id`` are loaded in on_prepare.
let partition_client = self
.context
.client
@@ -315,6 +187,25 @@ impl WalPruneProcedure {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
})?;
+ let earliest_offset = partition_client
+ .get_offset(OffsetAt::Earliest)
+ .await
+ .context(error::GetOffsetSnafu {
+ topic: self.data.topic.clone(),
+ })?;
+ let latest_offset = partition_client
+ .get_offset(OffsetAt::Latest)
+ .await
+ .context(error::GetOffsetSnafu {
+ topic: self.data.topic.clone(),
+ })?;
+ if self.data.prunable_entry_id <= earliest_offset as u64 {
+ warn!(
+ "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}",
+ self.data.topic, self.data.prunable_entry_id, earliest_offset as u64
+ );
+ return Ok(Status::done());
+ }
// Should update the min prunable entry id in the kv backend before deleting records.
// Otherwise, when a datanode restarts, it will not be able to find the wal entries.
@@ -359,13 +250,14 @@ impl WalPruneProcedure {
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
- "Failed to delete records for topic: {}, partition: {}, offset: {}",
- self.data.topic, DEFAULT_PARTITION, self.data.prunable_entry_id
+ "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
+ self.data.topic, self.data.prunable_entry_id, latest_offset as u64
),
})?;
+
info!(
- "Successfully pruned WAL for topic: {}, entry id: {}",
- self.data.topic, self.data.prunable_entry_id
+ "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
+ self.data.topic, self.data.prunable_entry_id, latest_offset as u64
);
Ok(Status::done())
}
@@ -386,7 +278,6 @@ impl Procedure for WalPruneProcedure {
match state {
WalPruneState::Prepare => self.on_prepare().await,
- WalPruneState::FlushRegion => self.on_sending_flush_request().await,
WalPruneState::Prune => self.on_prune().await,
}
.map_err(|e| {
@@ -430,14 +321,11 @@ fn check_heartbeat_collected_region_ids(
mod tests {
use std::assert_matches::assert_matches;
- use api::v1::meta::HeartbeatResponse;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use rskafka::record::Record;
- use tokio::sync::mpsc::Receiver;
use super::*;
- use crate::handler::HeartbeatMailbox;
use crate::procedure::test_util::new_wal_prune_metadata;
// Fix this import to correctly point to the test_util module
use crate::procedure::wal_prune::test_util::TestEnv;
@@ -446,7 +334,7 @@ mod tests {
/// Including:
/// 1. Prepare some data in the table metadata manager and in-memory kv backend.
/// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
- async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec) {
+ async fn mock_test_data(procedure: &WalPruneProcedure) -> u64 {
let n_region = 10;
let n_table = 5;
// 5 entries per region.
@@ -456,17 +344,16 @@ mod tests {
(n_region * n_table * 5) as usize,
)
.await;
- let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata(
+ let prunable_entry_id = new_wal_prune_metadata(
procedure.context.table_metadata_manager.clone(),
procedure.context.leader_region_registry.clone(),
n_region,
n_table,
&offsets,
- procedure.data.trigger_flush_threshold,
procedure.data.topic.clone(),
)
.await;
- (prunable_entry_id, regions_to_flush)
+ prunable_entry_id
}
fn record(i: usize) -> Record {
@@ -541,26 +428,6 @@ mod tests {
.unwrap();
}
- async fn check_flush_request(
- rx: &mut Receiver>,
- region_ids: &[RegionId],
- ) {
- let resp = rx.recv().await.unwrap().unwrap();
- let msg = resp.mailbox_message.unwrap();
- let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
- let mut flush_requested_region_ids = match flush_instruction {
- Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
- _ => unreachable!(),
- };
- let sorted_region_ids = region_ids
- .iter()
- .cloned()
- .sorted_by_key(|a| a.as_u64())
- .collect::>();
- flush_requested_region_ids.sort_by_key(|a| a.as_u64());
- assert_eq!(flush_requested_region_ids, sorted_region_ids);
- }
-
#[tokio::test]
async fn test_procedure_execution() {
maybe_skip_kafka_integration_test!();
@@ -570,53 +437,26 @@ mod tests {
let mut topic_name = uuid::Uuid::new_v4().to_string();
// Topic should start with a letter.
topic_name = format!("test_procedure_execution-{}", topic_name);
- let mut env = TestEnv::new();
+ let env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
TestEnv::prepare_topic(&context.client, &topic_name).await;
- let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
+ let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, None);
- // Before any data in kvbackend is mocked, should return a retryable error.
- let result = procedure.on_prune().await;
- assert_matches!(result, Err(e) if e.is_retryable());
-
- let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
+ let prunable_entry_id = mock_test_data(&procedure).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
- persist: true,
- clean_poisons: false
- }
- );
- assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
- assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
- assert_eq!(
- procedure.data.regions_to_flush.len(),
- regions_to_flush.len()
- );
- for region_id in ®ions_to_flush {
- assert!(procedure.data.regions_to_flush.contains(region_id));
- }
-
- // Step 2: Test `on_sending_flush_request`.
- let (tx, mut rx) = tokio::sync::mpsc::channel(1);
- env.mailbox
- .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
- .await;
- let status = procedure.on_sending_flush_request().await.unwrap();
- check_flush_request(&mut rx, ®ions_to_flush).await;
- assert_matches!(
- status,
- Status::Executing {
- persist: true,
+ persist: false,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
+ assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
- // Step 3: Test `on_prune`.
+ // Step 2: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after(include) `prunable_entry_id` still exist.
@@ -645,18 +485,12 @@ mod tests {
.unwrap();
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
- // Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
+ // Step 3: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
- // Step 5: Test `on_prepare`, don't flush regions.
- procedure.data.trigger_flush_threshold = 0;
- procedure.on_prepare().await.unwrap();
- assert_matches!(procedure.data.state, WalPruneState::Prune);
- assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
-
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
}
diff --git a/src/meta-srv/src/procedure/wal_prune/manager.rs b/src/meta-srv/src/procedure/wal_prune/manager.rs
index e2f386b0b6..3ad29d29be 100644
--- a/src/meta-srv/src/procedure/wal_prune/manager.rs
+++ b/src/meta-srv/src/procedure/wal_prune/manager.rs
@@ -15,20 +15,17 @@
use std::collections::hash_set::Entry;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
-use std::sync::{Arc, Mutex, RwLock};
-use std::time::Duration;
+use std::sync::{Arc, RwLock};
use common_meta::key::TableMetadataManagerRef;
-use common_meta::leadership_notifier::LeadershipChangeListener;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
-use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
-use tokio::time::{interval_at, Instant, MissedTickBehavior};
+use crate::define_ticker;
use crate::error::{self, Result};
use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
@@ -97,81 +94,13 @@ impl Debug for Event {
}
}
-/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
-/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
-pub(crate) struct WalPruneTicker {
- /// Handle of ticker thread.
- pub(crate) tick_handle: Mutex