diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml
index a97f921f8c..58cc188985 100644
--- a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml
+++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml
@@ -2,13 +2,13 @@ meta:
configData: |-
[runtime]
global_rt_size = 4
-
+
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
num_topics = 3
+ auto_prune_topic_records = true
-
[datanode]
[datanode.client]
timeout = "120s"
diff --git a/config/config.md b/config/config.md
index d0d7582db5..f34a41d861 100644
--- a/config/config.md
+++ b/config/config.md
@@ -343,6 +343,9 @@
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
+| `wal.auto_prune_interval` | String | `0s` | Interval of automatically WAL pruning.
Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. |
+| `wal.trigger_flush_threshold` | Integer | `0` | The threshold to trigger a flush operation of a region in automatically WAL pruning.
Metasrv will send a flush request to flush the region when:
`trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
where:
- `prunable_entry_id` is the maximum entry id that can be pruned of the region.
- `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
Set to `0` to disable the flush operation. |
+| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
Only accepts strings that match the following regular expression pattern:
[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index 0eb9900c2a..89c92352b2 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -130,6 +130,22 @@ broker_endpoints = ["127.0.0.1:9092"]
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
auto_create_topics = true
+## Interval of automatically WAL pruning.
+## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
+auto_prune_interval = "0s"
+
+## The threshold to trigger a flush operation of a region in automatically WAL pruning.
+## Metasrv will send a flush request to flush the region when:
+## `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
+## where:
+## - `prunable_entry_id` is the maximum entry id that can be pruned of the region.
+## - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
+## Set to `0` to disable the flush operation.
+trigger_flush_threshold = 0
+
+## Concurrent task limit for automatically WAL pruning.
+auto_prune_parallelism = 10
+
## Number of topics.
num_topics = 64
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index 3177a2446f..320a2849ed 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -779,6 +779,8 @@ impl InformationExtension for StandaloneInformationExtension {
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
+ data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
+ metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
}
})
.collect::>();
diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs
index 2f45e6bdb9..499ed865a2 100644
--- a/src/common/meta/src/datanode.rs
+++ b/src/common/meta/src/datanode.rs
@@ -94,6 +94,13 @@ pub struct RegionStat {
pub index_size: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
+ /// The latest entry id of topic used by data.
+ /// **Only used by remote WAL prune.**
+ pub data_topic_latest_entry_id: u64,
+ /// The latest entry id of topic used by metadata.
+ /// **Only used by remote WAL prune.**
+ /// In mito engine, this is the same as `data_topic_latest_entry_id`.
+ pub metadata_topic_latest_entry_id: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
@@ -264,6 +271,8 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
+ data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
+ metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
}
}
}
diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs
index 344e0fef5b..49d97f8a2f 100644
--- a/src/common/meta/src/region_registry.rs
+++ b/src/common/meta/src/region_registry.rs
@@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use store_api::storage::RegionId;
-use crate::datanode::RegionManifestInfo;
+use crate::datanode::{RegionManifestInfo, RegionStat};
/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
@@ -35,25 +35,22 @@ pub enum LeaderRegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
+ topic_latest_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
+ data_topic_latest_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
+ metadata_topic_latest_entry_id: u64,
},
}
-impl From for LeaderRegionManifestInfo {
- fn from(value: RegionManifestInfo) -> Self {
- match value {
- RegionManifestInfo::Mito {
- manifest_version,
- flushed_entry_id,
- } => LeaderRegionManifestInfo::Mito {
- manifest_version,
- flushed_entry_id,
- },
+impl LeaderRegionManifestInfo {
+ /// Generate a [LeaderRegionManifestInfo] from [RegionStat].
+ pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
+ match region_stat.region_manifest {
RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
@@ -62,14 +59,22 @@ impl From for LeaderRegionManifestInfo {
} => LeaderRegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
+ data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
+ metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
+ },
+ RegionManifestInfo::Mito {
+ manifest_version,
+ flushed_entry_id,
+ } => LeaderRegionManifestInfo::Mito {
+ manifest_version,
+ flushed_entry_id,
+ topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
},
}
}
-}
-impl LeaderRegionManifestInfo {
/// Returns the manifest version of the leader region.
pub fn manifest_version(&self) -> u64 {
match self {
@@ -96,17 +101,33 @@ impl LeaderRegionManifestInfo {
}
}
- /// Returns the minimum flushed entry id of the leader region.
- pub fn min_flushed_entry_id(&self) -> u64 {
+ /// Returns prunable entry id of the leader region.
+ /// It is used to determine the entry id that can be pruned in remote wal.
+ ///
+ /// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush).
+ ///
+ /// For a metric region, the prunable entry id should min(
+ /// max(data_flushed_entry_id, data_latest_entry_id_since_flush),
+ /// max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush)
+ /// ).
+ pub fn prunable_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
+ data_topic_latest_entry_id,
metadata_flushed_entry_id,
+ metadata_topic_latest_entry_id,
..
- } => (*data_flushed_entry_id).min(*metadata_flushed_entry_id),
+ } => {
+ let data_prunable_entry_id =
+ (*data_flushed_entry_id).max(*data_topic_latest_entry_id);
+ let metadata_prunable_entry_id =
+ (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
+ data_prunable_entry_id.min(metadata_prunable_entry_id)
+ }
}
}
}
diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs
index 2aba2a5ee3..a6e1482f04 100644
--- a/src/common/meta/src/wal_options_allocator.rs
+++ b/src/common/meta/src/wal_options_allocator.rs
@@ -30,7 +30,9 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
-pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
+pub use crate::wal_options_allocator::topic_creator::{
+ build_kafka_client, build_kafka_topic_creator,
+};
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs
index f49d1bf1ca..1a023546d3 100644
--- a/src/common/meta/src/wal_options_allocator/topic_creator.rs
+++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs
@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::sync::Arc;
-
use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
@@ -34,11 +32,9 @@ use crate::error::{
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
-type KafkaClientRef = Arc;
-
/// Creates topics in kafka.
pub struct KafkaTopicCreator {
- client: KafkaClientRef,
+ client: Client,
/// The number of partitions per topic.
num_partitions: i32,
/// The replication factor of each topic.
@@ -48,7 +44,7 @@ pub struct KafkaTopicCreator {
}
impl KafkaTopicCreator {
- pub fn client(&self) -> &KafkaClientRef {
+ pub fn client(&self) -> &Client {
&self.client
}
@@ -133,7 +129,8 @@ impl KafkaTopicCreator {
}
}
-pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result {
+/// Builds a kafka [Client](rskafka::client::Client).
+pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
@@ -145,15 +142,19 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result Result {
+ let client = build_kafka_client(config).await?;
Ok(KafkaTopicCreator {
- client: Arc::new(client),
+ client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index 831faef772..921c1faaa3 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -15,6 +15,8 @@
pub mod kafka;
pub mod raft_engine;
+use std::time::Duration;
+
use serde::{Deserialize, Serialize};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
@@ -53,11 +55,32 @@ impl From for MetasrvWalConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
+ auto_prune_interval: config.auto_prune_interval,
+ trigger_flush_threshold: config.trigger_flush_threshold,
+ auto_prune_parallelism: config.auto_prune_parallelism,
}),
}
}
}
+impl MetasrvWalConfig {
+ /// Returns if active wal pruning is enabled.
+ pub fn enable_active_wal_pruning(&self) -> bool {
+ match self {
+ MetasrvWalConfig::RaftEngine => false,
+ MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
+ }
+ }
+
+ /// Gets the kafka connection config.
+ pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
+ match self {
+ MetasrvWalConfig::RaftEngine => None,
+ MetasrvWalConfig::Kafka(config) => Some(config),
+ }
+ }
+}
+
impl From for DatanodeWalConfig {
fn from(config: MetasrvWalConfig) -> Self {
match config {
@@ -181,6 +204,9 @@ mod tests {
create_topic_timeout: Duration::from_secs(30),
},
auto_create_topics: true,
+ auto_prune_interval: Duration::from_secs(0),
+ trigger_flush_threshold: 0,
+ auto_prune_parallelism: 10,
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs
index ea58a3d49e..6b2c9992f4 100644
--- a/src/common/wal/src/config/kafka/common.rs
+++ b/src/common/wal/src/config/kafka/common.rs
@@ -30,6 +30,13 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(120)),
};
+/// Default interval for active WAL pruning.
+pub const DEFAULT_ACTIVE_PRUNE_INTERVAL: Duration = Duration::ZERO;
+/// Default limit for concurrent active pruning tasks.
+pub const DEFAULT_ACTIVE_PRUNE_TASK_LIMIT: usize = 10;
+/// Default interval for sending flush request to regions when pruning remote WAL.
+pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0;
+
use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs
index 77cf05397d..dd659d636e 100644
--- a/src/common/wal/src/config/kafka/datanode.rs
+++ b/src/common/wal/src/config/kafka/datanode.rs
@@ -17,7 +17,10 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
-use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
+use crate::config::kafka::common::{
+ KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL,
+ DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
+};
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -44,6 +47,15 @@ pub struct DatanodeKafkaConfig {
pub dump_index_interval: Duration,
/// Ignore missing entries during read WAL.
pub overwrite_entry_start_id: bool,
+ // Active WAL pruning.
+ pub auto_prune_topic_records: bool,
+ // Interval of WAL pruning.
+ pub auto_prune_interval: Duration,
+ // Threshold for sending flush request when pruning remote WAL.
+ // `None` stands for never sending flush request.
+ pub trigger_flush_threshold: u64,
+ // Limit of concurrent active pruning procedures.
+ pub auto_prune_parallelism: usize,
}
impl Default for DatanodeKafkaConfig {
@@ -58,6 +70,10 @@ impl Default for DatanodeKafkaConfig {
create_index: true,
dump_index_interval: Duration::from_secs(60),
overwrite_entry_start_id: false,
+ auto_prune_topic_records: false,
+ auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL,
+ trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
+ auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT,
}
}
}
diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs
index 27df3569b8..acbfbe05c6 100644
--- a/src/common/wal/src/config/kafka/metasrv.rs
+++ b/src/common/wal/src/config/kafka/metasrv.rs
@@ -12,9 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::time::Duration;
+
use serde::{Deserialize, Serialize};
-use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
+use crate::config::kafka::common::{
+ KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL,
+ DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
+};
/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -28,6 +33,13 @@ pub struct MetasrvKafkaConfig {
pub kafka_topic: KafkaTopicConfig,
// Automatically create topics for WAL.
pub auto_create_topics: bool,
+ // Interval of WAL pruning.
+ pub auto_prune_interval: Duration,
+ // Threshold for sending flush request when pruning remote WAL.
+ // `None` stands for never sending flush request.
+ pub trigger_flush_threshold: u64,
+ // Limit of concurrent active pruning procedures.
+ pub auto_prune_parallelism: usize,
}
impl Default for MetasrvKafkaConfig {
@@ -36,6 +48,9 @@ impl Default for MetasrvKafkaConfig {
connection: Default::default(),
kafka_topic: Default::default(),
auto_create_topics: true,
+ auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL,
+ trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
+ auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT,
}
}
}
diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs
index 7c45fa408f..bd249c5178 100644
--- a/src/meta-srv/src/error.rs
+++ b/src/meta-srv/src/error.rs
@@ -518,6 +518,13 @@ pub enum Error {
source: common_procedure::Error,
},
+ #[snafu(display("A prune task for topic {} is already running", topic))]
+ PruneTaskAlreadyRunning {
+ topic: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
#[snafu(display("Schema already exists, name: {schema_name}"))]
SchemaAlreadyExists {
schema_name: String,
@@ -788,6 +795,14 @@ pub enum Error {
source: common_meta::error::Error,
},
+ #[snafu(display("Failed to build kafka client."))]
+ BuildKafkaClient {
+ #[snafu(implicit)]
+ location: Location,
+ #[snafu(source)]
+ error: common_meta::error::Error,
+ },
+
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
@@ -875,7 +890,9 @@ impl ErrorExt for Error {
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildPartitionClient { .. }
- | Error::DeleteRecords { .. } => StatusCode::Internal,
+ | Error::BuildKafkaClient { .. }
+ | Error::DeleteRecords { .. }
+ | Error::PruneTaskAlreadyRunning { .. } => StatusCode::Internal,
Error::Unsupported { .. } => StatusCode::Unsupported,
diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs
index fd5fab3639..13aee5d234 100644
--- a/src/meta-srv/src/handler/collect_leader_region_handler.rs
+++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
-use common_meta::region_registry::LeaderRegion;
+use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
use store_api::region_engine::RegionRole;
use crate::error::Result;
@@ -44,7 +44,7 @@ impl HeartbeatHandler for CollectLeaderRegionHandler {
continue;
}
- let manifest = stat.region_manifest.into();
+ let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
let value = LeaderRegion {
datanode_id: current_stat.id,
manifest,
@@ -122,6 +122,8 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
+ data_topic_latest_entry_id: 0,
+ metadata_topic_latest_entry_id: 0,
}
}
@@ -161,6 +163,7 @@ mod tests {
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: 0,
+ topic_latest_entry_id: 0,
},
})
);
@@ -192,6 +195,7 @@ mod tests {
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 2,
flushed_entry_id: 0,
+ topic_latest_entry_id: 0,
},
})
);
@@ -224,6 +228,7 @@ mod tests {
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 2,
flushed_entry_id: 0,
+ topic_latest_entry_id: 0,
},
})
);
diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs
index 81e9bfaeb9..cdcd9d3228 100644
--- a/src/meta-srv/src/handler/failure_handler.rs
+++ b/src/meta-srv/src/handler/failure_handler.rs
@@ -102,6 +102,8 @@ mod tests {
manifest_version: 0,
flushed_entry_id: 0,
},
+ data_topic_latest_entry_id: 0,
+ metadata_topic_latest_entry_id: 0,
}
}
acc.stat = Some(Stat {
diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs
index b89a570b80..d900078897 100644
--- a/src/meta-srv/src/handler/region_lease_handler.rs
+++ b/src/meta-srv/src/handler/region_lease_handler.rs
@@ -160,6 +160,8 @@ mod test {
manifest_version: 0,
flushed_entry_id: 0,
},
+ data_topic_latest_entry_id: 0,
+ metadata_topic_latest_entry_id: 0,
}
}
diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs
index 9a9f0861a8..4b61eeeae3 100644
--- a/src/meta-srv/src/lib.rs
+++ b/src/meta-srv/src/lib.rs
@@ -15,6 +15,7 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(extract_if)]
+#![feature(hash_set_entry)]
pub mod bootstrap;
pub mod cache_invalidator;
diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs
index 00ac628b61..34b3cac25e 100644
--- a/src/meta-srv/src/metasrv.rs
+++ b/src/meta-srv/src/metasrv.rs
@@ -61,6 +61,7 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
use crate::lease::lookup_datanode_peer;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
+use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
@@ -407,6 +408,7 @@ pub struct Metasrv {
region_supervisor_ticker: Option,
cache_invalidator: CacheInvalidatorRef,
leader_region_registry: LeaderRegionRegistryRef,
+ wal_prune_ticker: Option,
plugins: Plugins,
}
@@ -461,6 +463,9 @@ impl Metasrv {
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}
+ if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
+ leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
+ }
if let Some(customizer) = self.plugins.get::() {
customizer.customize(&mut leadership_change_notifier);
}
diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs
index 0f1a04e47b..ec8f6ef253 100644
--- a/src/meta-srv/src/metasrv/builder.rs
+++ b/src/meta-srv/src/metasrv/builder.rs
@@ -37,7 +37,7 @@ use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
-use common_meta::wal_options_allocator::build_wal_options_allocator;
+use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
@@ -58,6 +58,8 @@ use crate::metasrv::{
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
+use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
+use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
@@ -346,6 +348,40 @@ impl MetasrvBuilder {
.context(error::InitDdlManagerSnafu)?,
);
+ // remote WAL prune ticker and manager
+ let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
+ let (tx, rx) = WalPruneManager::channel();
+ // Safety: Must be remote WAL.
+ let remote_wal_options = options.wal.remote_wal_options().unwrap();
+ let kafka_client = build_kafka_client(remote_wal_options)
+ .await
+ .context(error::BuildKafkaClientSnafu)?;
+ let wal_prune_context = WalPruneContext {
+ client: Arc::new(kafka_client),
+ table_metadata_manager: table_metadata_manager.clone(),
+ leader_region_registry: leader_region_registry.clone(),
+ server_addr: options.server_addr.clone(),
+ mailbox: mailbox.clone(),
+ };
+ let wal_prune_manager = WalPruneManager::new(
+ table_metadata_manager.clone(),
+ remote_wal_options.auto_prune_parallelism,
+ rx,
+ procedure_manager.clone(),
+ wal_prune_context,
+ remote_wal_options.trigger_flush_threshold,
+ );
+ // Start manager in background. Ticker will be started in the main thread to send ticks.
+ wal_prune_manager.try_start().await?;
+ let wal_prune_ticker = Arc::new(WalPruneTicker::new(
+ remote_wal_options.auto_prune_interval,
+ tx.clone(),
+ ));
+ Some(wal_prune_ticker)
+ } else {
+ None
+ };
+
let customized_region_lease_renewer = plugins
.as_ref()
.and_then(|plugins| plugins.get::());
@@ -406,6 +442,7 @@ impl MetasrvBuilder {
region_supervisor_ticker,
cache_invalidator,
leader_region_registry,
+ wal_prune_ticker,
})
}
}
diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs
index c5e2d3df4d..a9750ae5f3 100644
--- a/src/meta-srv/src/metrics.rs
+++ b/src/meta-srv/src/metrics.rs
@@ -66,4 +66,7 @@ lazy_static! {
// The heartbeat rate counter.
pub static ref METRIC_META_HEARTBEAT_RATE: IntCounter =
register_int_counter!("greptime_meta_heartbeat_rate", "meta heartbeat arrival rate").unwrap();
+ /// The remote WAL prune execute counter.
+ pub static ref METRIC_META_REMOTE_WAL_PRUNE_EXECUTE: IntCounterVec =
+ register_int_counter_vec!("greptime_meta_remote_wal_prune_execute", "meta remote wal prune execute", &["topic_name"]).unwrap();
}
diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs
index 34ce23abd4..ca6da59f2a 100644
--- a/src/meta-srv/src/procedure/test_util.rs
+++ b/src/meta-srv/src/procedure/test_util.rs
@@ -179,8 +179,8 @@ pub async fn new_wal_prune_metadata(
) -> (EntryId, Vec) {
let datanode_id = 1;
let from_peer = Peer::empty(datanode_id);
- let mut min_flushed_entry_id = u64::MAX;
- let mut max_flushed_entry_id = 0;
+ let mut min_prunable_entry_id = u64::MAX;
+ let mut max_prunable_entry_id = 0;
let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize);
for table_id in 0..n_table {
let region_ids = (0..n_region)
@@ -221,10 +221,10 @@ pub async fn new_wal_prune_metadata(
.iter()
.map(|region_id| {
let rand_n = rand::random::() as usize;
- let current_flushed_entry_id = offsets[rand_n % offsets.len()] as u64;
- min_flushed_entry_id = min_flushed_entry_id.min(current_flushed_entry_id);
- max_flushed_entry_id = max_flushed_entry_id.max(current_flushed_entry_id);
- (*region_id, current_flushed_entry_id)
+ let current_prunable_entry_id = offsets[rand_n % offsets.len()] as u64;
+ min_prunable_entry_id = min_prunable_entry_id.min(current_prunable_entry_id);
+ max_prunable_entry_id = max_prunable_entry_id.max(current_prunable_entry_id);
+ (*region_id, current_prunable_entry_id)
})
.collect::>();
region_entry_ids.extend(current_region_entry_ids.clone());
@@ -235,15 +235,15 @@ pub async fn new_wal_prune_metadata(
let regions_to_flush = region_entry_ids
.iter()
- .filter_map(|(region_id, flushed_entry_id)| {
- if max_flushed_entry_id - flushed_entry_id > threshold {
+ .filter_map(|(region_id, prunable_entry_id)| {
+ if max_prunable_entry_id - prunable_entry_id > threshold {
Some(*region_id)
} else {
None
}
})
.collect::>();
- (min_flushed_entry_id, regions_to_flush)
+ (min_prunable_entry_id, regions_to_flush)
}
pub async fn update_in_memory_region_flushed_entry_id(
@@ -257,6 +257,7 @@ pub async fn update_in_memory_region_flushed_entry_id(
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id,
+ topic_latest_entry_id: 0,
},
};
key_values.push((region_id, value));
diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs
index b7f145fffc..0247c928ec 100644
--- a/src/meta-srv/src/procedure/wal_prune.rs
+++ b/src/meta-srv/src/procedure/wal_prune.rs
@@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+pub(crate) mod manager;
+#[cfg(test)]
+mod test_util;
+
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
@@ -28,9 +32,10 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey,
};
-use common_telemetry::warn;
+use common_telemetry::{info, warn};
use itertools::{Itertools, MinMaxResult};
use log_store::kafka::DEFAULT_PARTITION;
+use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
@@ -45,9 +50,8 @@ use crate::error::{
use crate::service::mailbox::{Channel, MailboxRef};
use crate::Result;
-type KafkaClientRef = Arc;
+pub type KafkaClientRef = Arc;
-/// No timeout for flush request.
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1);
/// The state of WAL pruning.
@@ -58,17 +62,18 @@ pub enum WalPruneState {
Prune,
}
+#[derive(Clone)]
pub struct Context {
/// The Kafka client.
- client: KafkaClientRef,
+ pub client: KafkaClientRef,
/// The table metadata manager.
- table_metadata_manager: TableMetadataManagerRef,
+ pub table_metadata_manager: TableMetadataManagerRef,
/// The leader region registry.
- leader_region_registry: LeaderRegionRegistryRef,
+ pub leader_region_registry: LeaderRegionRegistryRef,
/// Server address of metasrv.
- server_addr: String,
+ pub server_addr: String,
/// The mailbox to send messages.
- mailbox: MailboxRef,
+ pub mailbox: MailboxRef,
}
/// The data of WAL pruning.
@@ -77,10 +82,11 @@ pub struct WalPruneData {
/// The topic name to prune.
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
- pub min_flushed_entry_id: EntryId,
+ pub prunable_entry_id: EntryId,
pub regions_to_flush: Vec,
- /// If `flushed_entry_id` + `trigger_flush_threshold` < `max_flushed_entry_id`, send a flush request to the region.
- pub trigger_flush_threshold: Option,
+ /// If `prunable_entry_id` + `trigger_flush_threshold` < `max_prunable_entry_id`, send a flush request to the region.
+ /// If `None`, never send flush requests.
+ pub trigger_flush_threshold: u64,
/// The state.
pub state: WalPruneState,
}
@@ -89,27 +95,43 @@ pub struct WalPruneData {
pub struct WalPruneProcedure {
pub data: WalPruneData,
pub context: Context,
+ pub _guard: Option,
}
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
- pub fn new(topic: String, context: Context, trigger_flush_threshold: Option) -> Self {
+ pub fn new(
+ topic: String,
+ context: Context,
+ trigger_flush_threshold: u64,
+ guard: Option,
+ ) -> Self {
Self {
data: WalPruneData {
topic,
- min_flushed_entry_id: 0,
+ prunable_entry_id: 0,
trigger_flush_threshold,
regions_to_flush: vec![],
state: WalPruneState::Prepare,
},
context,
+ _guard: guard,
}
}
- pub fn from_json(json: &str, context: Context) -> ProcedureResult {
+ pub fn from_json(
+ json: &str,
+ context: &Context,
+ tracker: WalPruneProcedureTracker,
+ ) -> ProcedureResult {
let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
- Ok(Self { data, context })
+ let guard = tracker.insert_running_procedure(data.topic.clone());
+ Ok(Self {
+ data,
+ context: context.clone(),
+ _guard: guard,
+ })
}
async fn build_peer_to_region_ids_map(
@@ -182,20 +204,20 @@ impl WalPruneProcedure {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to get topic-region map",
})?;
- let flush_entry_ids_map: HashMap<_, _> = self
+ let prunable_entry_ids_map: HashMap<_, _> = self
.context
.leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.map(|(region_id, region)| {
- let flushed_entry_id = region.manifest.min_flushed_entry_id();
- (region_id, flushed_entry_id)
+ let prunable_entry_id = region.manifest.prunable_entry_id();
+ (region_id, prunable_entry_id)
})
.collect();
- // Check if the `flush_entry_ids_map` contains all region ids.
+ // Check if the `prunable_entry_ids_map` contains all region ids.
let non_collected_region_ids =
- check_heartbeat_collected_region_ids(®ion_ids, &flush_entry_ids_map);
+ check_heartbeat_collected_region_ids(®ion_ids, &prunable_entry_ids_map);
if !non_collected_region_ids.is_empty() {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
@@ -204,23 +226,23 @@ impl WalPruneProcedure {
return Ok(Status::done());
}
- let min_max_result = flush_entry_ids_map.values().minmax();
- let max_flushed_entry_id = match min_max_result {
+ let min_max_result = prunable_entry_ids_map.values().minmax();
+ let max_prunable_entry_id = match min_max_result {
MinMaxResult::NoElements => {
return Ok(Status::done());
}
- MinMaxResult::OneElement(flushed_entry_id) => {
- self.data.min_flushed_entry_id = *flushed_entry_id;
- *flushed_entry_id
+ MinMaxResult::OneElement(prunable_entry_id) => {
+ self.data.prunable_entry_id = *prunable_entry_id;
+ *prunable_entry_id
}
- MinMaxResult::MinMax(min_flushed_entry_id, max_flushed_entry_id) => {
- self.data.min_flushed_entry_id = *min_flushed_entry_id;
- *max_flushed_entry_id
+ MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => {
+ self.data.prunable_entry_id = *min_prunable_entry_id;
+ *max_prunable_entry_id
}
};
- if let Some(threshold) = self.data.trigger_flush_threshold {
- for (region_id, flushed_entry_id) in flush_entry_ids_map {
- if flushed_entry_id + threshold < max_flushed_entry_id {
+ if self.data.trigger_flush_threshold != 0 {
+ for (region_id, prunable_entry_id) in prunable_entry_ids_map {
+ if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id {
self.data.regions_to_flush.push(region_id);
}
}
@@ -232,10 +254,17 @@ impl WalPruneProcedure {
}
/// Send the flush request to regions with low flush entry id.
+ ///
+ /// Retry:
+ /// - Failed to build peer to region ids map. It means failure in retrieving metadata.
pub async fn on_sending_flush_request(&mut self) -> Result {
let peer_to_region_ids_map = self
.build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
- .await?;
+ .await
+ .map_err(BoxedError::new)
+ .with_context(|_| error::RetryLaterWithSourceSnafu {
+ reason: "Failed to build peer to region ids map",
+ })?;
let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
for (peer, flush_instruction) in flush_instructions.into_iter() {
let msg = MailboxMessage::json_message(
@@ -255,13 +284,13 @@ impl WalPruneProcedure {
Ok(Status::executing(true))
}
- /// Prune the WAL and persist the minimum flushed entry id.
+ /// Prune the WAL and persist the minimum prunable entry id.
///
/// Retry:
- /// - Failed to update the minimum flushed entry id in kvbackend.
+ /// - Failed to update the minimum prunable entry id in kvbackend.
/// - Failed to delete records.
pub async fn on_prune(&mut self) -> Result {
- // Safety: flushed_entry_ids are loaded in on_prepare.
+ // Safety: `prunable_entry_id`` are loaded in on_prepare.
let partition_client = self
.context
.client
@@ -276,7 +305,7 @@ impl WalPruneProcedure {
partition: DEFAULT_PARTITION,
})?;
- // Should update the min flushed entry id in the kv backend before deleting records.
+ // Should update the min prunable entry id in the kv backend before deleting records.
// Otherwise, when a datanode restarts, it will not be able to find the wal entries.
let prev = self
.context
@@ -292,7 +321,7 @@ impl WalPruneProcedure {
self.context
.table_metadata_manager
.topic_name_manager()
- .update(&self.data.topic, self.data.min_flushed_entry_id, prev)
+ .update(&self.data.topic, self.data.prunable_entry_id, prev)
.await
.context(UpdateTopicNameValueSnafu {
topic: &self.data.topic,
@@ -306,14 +335,14 @@ impl WalPruneProcedure {
})?;
partition_client
.delete_records(
- (self.data.min_flushed_entry_id + 1) as i64,
+ (self.data.prunable_entry_id + 1) as i64,
DELETE_RECORDS_TIMEOUT.as_millis() as i32,
)
.await
.context(DeleteRecordsSnafu {
topic: &self.data.topic,
partition: DEFAULT_PARTITION,
- offset: (self.data.min_flushed_entry_id + 1),
+ offset: (self.data.prunable_entry_id + 1),
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
@@ -321,9 +350,13 @@ impl WalPruneProcedure {
"Failed to delete records for topic: {}, partition: {}, offset: {}",
self.data.topic,
DEFAULT_PARTITION,
- self.data.min_flushed_entry_id + 1
+ self.data.prunable_entry_id + 1
),
})?;
+ info!(
+ "Successfully pruned WAL for topic: {}, entry id: {}",
+ self.data.topic, self.data.prunable_entry_id
+ );
Ok(Status::done())
}
}
@@ -388,123 +421,41 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::HeartbeatResponse;
- use common_meta::key::TableMetadataManager;
- use common_meta::kv_backend::memory::MemoryKvBackend;
- use common_meta::region_registry::LeaderRegionRegistry;
- use common_meta::sequence::SequenceBuilder;
- use common_meta::wal_options_allocator::build_kafka_topic_creator;
- use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
- use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use rskafka::record::Record;
use tokio::sync::mpsc::Receiver;
use super::*;
use crate::handler::HeartbeatMailbox;
- use crate::procedure::test_util::{new_wal_prune_metadata, MailboxContext};
-
- struct TestEnv {
- table_metadata_manager: TableMetadataManagerRef,
- leader_region_registry: LeaderRegionRegistryRef,
- mailbox: MailboxContext,
- server_addr: String,
- }
-
- impl TestEnv {
- fn new() -> Self {
- let kv_backend = Arc::new(MemoryKvBackend::new());
- let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
- let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
- let mailbox_sequence =
- SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
-
- let mailbox_ctx = MailboxContext::new(mailbox_sequence);
-
- Self {
- table_metadata_manager,
- leader_region_registry,
- mailbox: mailbox_ctx,
- server_addr: "localhost".to_string(),
- }
- }
-
- fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
- &self.table_metadata_manager
- }
-
- fn leader_region_registry(&self) -> &LeaderRegionRegistryRef {
- &self.leader_region_registry
- }
-
- fn mailbox_context(&self) -> &MailboxContext {
- &self.mailbox
- }
-
- fn server_addr(&self) -> &str {
- &self.server_addr
- }
- }
+ use crate::procedure::test_util::new_wal_prune_metadata;
+ // Fix this import to correctly point to the test_util module
+ use crate::procedure::wal_prune::test_util::TestEnv;
/// Mock a test env for testing.
/// Including:
/// 1. Prepare some data in the table metadata manager and in-memory kv backend.
- /// 2. Generate a `WalPruneProcedure` with the test env.
- /// 3. Return the procedure, the minimum last entry id to prune and the regions to flush.
- async fn mock_test_env(
- topic: String,
- broker_endpoints: Vec,
- env: &TestEnv,
- ) -> (WalPruneProcedure, u64, Vec) {
- // Creates a topic manager.
- let kafka_topic = KafkaTopicConfig {
- replication_factor: broker_endpoints.len() as i16,
- ..Default::default()
- };
- let config = MetasrvKafkaConfig {
- connection: KafkaConnectionConfig {
- broker_endpoints,
- ..Default::default()
- },
- kafka_topic,
- ..Default::default()
- };
- let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
- let table_metadata_manager = env.table_metadata_manager().clone();
- let leader_region_registry = env.leader_region_registry().clone();
- let mailbox = env.mailbox_context().mailbox().clone();
-
+ /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
+ async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec) {
let n_region = 10;
let n_table = 5;
- let threshold = 10;
// 5 entries per region.
let offsets = mock_wal_entries(
- topic_creator.client().clone(),
- &topic,
+ procedure.context.client.clone(),
+ &procedure.data.topic,
(n_region * n_table * 5) as usize,
)
.await;
-
- let (min_flushed_entry_id, regions_to_flush) = new_wal_prune_metadata(
- table_metadata_manager.clone(),
- leader_region_registry.clone(),
+ let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata(
+ procedure.context.table_metadata_manager.clone(),
+ procedure.context.leader_region_registry.clone(),
n_region,
n_table,
&offsets,
- threshold,
- topic.clone(),
+ procedure.data.trigger_flush_threshold,
+ procedure.data.topic.clone(),
)
.await;
-
- let context = Context {
- client: topic_creator.client().clone(),
- table_metadata_manager,
- leader_region_registry,
- mailbox,
- server_addr: env.server_addr().to_string(),
- };
-
- let wal_prune_procedure = WalPruneProcedure::new(topic, context, Some(threshold));
- (wal_prune_procedure, min_flushed_entry_id, regions_to_flush)
+ (prunable_entry_id, regions_to_flush)
}
fn record(i: usize) -> Record {
@@ -603,10 +554,18 @@ mod tests {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
common_telemetry::init_default_ut_logging();
- let topic_name = "greptime_test_topic".to_string();
+ let mut topic_name = uuid::Uuid::new_v4().to_string();
+ // Topic should start with a letter.
+ topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
- let (mut procedure, min_flushed_entry_id, regions_to_flush) =
- mock_test_env(topic_name.clone(), broker_endpoints, &env).await;
+ let context = env.build_wal_prune_context(broker_endpoints).await;
+ let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
+
+ // Before any data in kvbackend is mocked, should return a retryable error.
+ let result = procedure.on_prune().await;
+ assert_matches!(result, Err(e) if e.is_retryable());
+
+ let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
@@ -618,7 +577,7 @@ mod tests {
}
);
assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
- assert_eq!(procedure.data.min_flushed_entry_id, min_flushed_entry_id);
+ assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
assert_eq!(
procedure.data.regions_to_flush.len(),
regions_to_flush.len()
@@ -646,34 +605,31 @@ mod tests {
// Step 3: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
- // Check if the entry ids after `min_flushed_entry_id` still exist.
+ // Check if the entry ids after `prunable_entry_id` still exist.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
- procedure.data.min_flushed_entry_id as i64 + 1,
+ procedure.data.prunable_entry_id as i64 + 1,
true,
)
.await;
- // Check if the entry s before `min_flushed_entry_id` are deleted.
+ // Check if the entry s before `prunable_entry_id` are deleted.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
- procedure.data.min_flushed_entry_id as i64,
+ procedure.data.prunable_entry_id as i64,
false,
)
.await;
- let min_entry_id = env
- .table_metadata_manager()
+ let value = env
+ .table_metadata_manager
.topic_name_manager()
.get(&topic_name)
.await
.unwrap()
.unwrap();
- assert_eq!(
- min_entry_id.pruned_entry_id,
- procedure.data.min_flushed_entry_id
- );
+ assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
@@ -682,13 +638,10 @@ mod tests {
assert_matches!(status, Status::Done { output: None });
// Step 5: Test `on_prepare`, don't flush regions.
- procedure.data.trigger_flush_threshold = None;
+ procedure.data.trigger_flush_threshold = 0;
procedure.on_prepare().await.unwrap();
assert_matches!(procedure.data.state, WalPruneState::Prune);
- assert_eq!(
- min_entry_id.pruned_entry_id,
- procedure.data.min_flushed_entry_id
- );
+ assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
diff --git a/src/meta-srv/src/procedure/wal_prune/manager.rs b/src/meta-srv/src/procedure/wal_prune/manager.rs
new file mode 100644
index 0000000000..8e5072ad11
--- /dev/null
+++ b/src/meta-srv/src/procedure/wal_prune/manager.rs
@@ -0,0 +1,438 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::hash_set::Entry;
+use std::collections::HashSet;
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex, RwLock};
+use std::time::Duration;
+
+use common_meta::key::TableMetadataManagerRef;
+use common_meta::leadership_notifier::LeadershipChangeListener;
+use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
+use common_runtime::JoinHandle;
+use common_telemetry::{error, info, warn};
+use futures::future::join_all;
+use snafu::{OptionExt, ResultExt};
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::sync::Semaphore;
+use tokio::time::{interval_at, Instant, MissedTickBehavior};
+
+use crate::error::{self, Result};
+use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
+use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
+
+pub type WalPruneTickerRef = Arc;
+
+/// Tracks running [WalPruneProcedure]s and the resources they hold.
+/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures.
+///
+/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework.
+#[derive(Clone)]
+pub struct WalPruneProcedureTracker {
+ running_procedures: Arc>>,
+}
+
+impl WalPruneProcedureTracker {
+ /// Insert a running [WalPruneProcedure] for the given topic name and
+ /// consume acquire a semaphore permit for the given topic name.
+ pub fn insert_running_procedure(&self, topic_name: String) -> Option {
+ let mut running_procedures = self.running_procedures.write().unwrap();
+ match running_procedures.entry(topic_name.clone()) {
+ Entry::Occupied(_) => None,
+ Entry::Vacant(entry) => {
+ entry.insert();
+ Some(WalPruneProcedureGuard {
+ topic_name,
+ running_procedures: self.running_procedures.clone(),
+ })
+ }
+ }
+ }
+
+ /// Number of running [WalPruneProcedure]s.
+ pub fn len(&self) -> usize {
+ self.running_procedures.read().unwrap().len()
+ }
+}
+
+/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure].
+/// It is used to track the running [WalPruneProcedure]s.
+/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore.
+pub struct WalPruneProcedureGuard {
+ topic_name: String,
+ running_procedures: Arc>>,
+}
+
+impl Drop for WalPruneProcedureGuard {
+ fn drop(&mut self) {
+ let mut running_procedures = self.running_procedures.write().unwrap();
+ running_procedures.remove(&self.topic_name);
+ }
+}
+
+/// Event is used to notify the [WalPruneManager] to do some work.
+///
+/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL.
+pub enum Event {
+ Tick,
+}
+
+impl Debug for Event {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Event::Tick => write!(f, "Tick"),
+ }
+ }
+}
+
+/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
+/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
+pub(crate) struct WalPruneTicker {
+ /// Handle of ticker thread.
+ pub(crate) tick_handle: Mutex