From e817a65d755d04ab76b92d626a6b9bc1ef82db56 Mon Sep 17 00:00:00 2001 From: Yuhan Wang Date: Sat, 19 Apr 2025 00:02:33 +0800 Subject: [PATCH] feat: enable submitting wal prune procedure periodically (#5867) * feat: enable submitting wal prune procedure periodically * chore: fix and add options * test: add unit test * test: fix unit test * test: enable active_wal_pruning in test * test: update default config * chore: update config name * refactor: use semaphore to control the number of prune process * refactor: use split client for wal prune manager and topic creator * chore: add configs * chore: apply review comments * fix: use tracker properly * fix: use guard to track semaphore * test: update unit tests * chore: update config name * chore: use prunable_entry_id * refactor: semaphore to only limit the process of submitting * chore: remove legacy sort * chore: better configs * fix: update config.md * chore: respect fmt * test: update unit tests * chore: use interval_at * fix: fix unit test * test: fix unit test * test: fix unit test * chore: apply review comments * docs: update config docs --- .../with-remote-wal.yaml | 4 +- config/config.md | 3 + config/metasrv.example.toml | 16 + src/cmd/src/standalone.rs | 2 + src/common/meta/src/datanode.rs | 9 + src/common/meta/src/region_registry.rs | 53 ++- src/common/meta/src/wal_options_allocator.rs | 4 +- .../wal_options_allocator/topic_creator.rs | 21 +- src/common/wal/src/config.rs | 26 ++ src/common/wal/src/config/kafka/common.rs | 7 + src/common/wal/src/config/kafka/datanode.rs | 18 +- src/common/wal/src/config/kafka/metasrv.rs | 17 +- src/meta-srv/src/error.rs | 19 +- .../handler/collect_leader_region_handler.rs | 9 +- src/meta-srv/src/handler/failure_handler.rs | 2 + .../src/handler/region_lease_handler.rs | 2 + src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 5 + src/meta-srv/src/metasrv/builder.rs | 39 +- src/meta-srv/src/metrics.rs | 3 + src/meta-srv/src/procedure/test_util.rs | 19 +- src/meta-srv/src/procedure/wal_prune.rs | 261 +++++------ .../src/procedure/wal_prune/manager.rs | 438 ++++++++++++++++++ .../src/procedure/wal_prune/test_util.rs | 94 ++++ src/meta-srv/src/selector/weight_compute.rs | 6 + tests/conf/metasrv-test.toml.template | 1 + 26 files changed, 881 insertions(+), 198 deletions(-) create mode 100644 src/meta-srv/src/procedure/wal_prune/manager.rs create mode 100644 src/meta-srv/src/procedure/wal_prune/test_util.rs diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml index a97f921f8c..58cc188985 100644 --- a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml +++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml @@ -2,13 +2,13 @@ meta: configData: |- [runtime] global_rt_size = 4 - + [wal] provider = "kafka" broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"] num_topics = 3 + auto_prune_topic_records = true - [datanode] [datanode.client] timeout = "120s" diff --git a/config/config.md b/config/config.md index d0d7582db5..f34a41d861 100644 --- a/config/config.md +++ b/config/config.md @@ -343,6 +343,9 @@ | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | | `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | +| `wal.auto_prune_interval` | String | `0s` | Interval of automatically WAL pruning.
Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. | +| `wal.trigger_flush_threshold` | Integer | `0` | The threshold to trigger a flush operation of a region in automatically WAL pruning.
Metasrv will send a flush request to flush the region when:
`trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
where:
- `prunable_entry_id` is the maximum entry id that can be pruned of the region.
- `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
Set to `0` to disable the flush operation. | +| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. | | `wal.num_topics` | Integer | `64` | Number of topics. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | | `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
Only accepts strings that match the following regular expression pattern:
[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 0eb9900c2a..89c92352b2 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -130,6 +130,22 @@ broker_endpoints = ["127.0.0.1:9092"] ## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` auto_create_topics = true +## Interval of automatically WAL pruning. +## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. +auto_prune_interval = "0s" + +## The threshold to trigger a flush operation of a region in automatically WAL pruning. +## Metasrv will send a flush request to flush the region when: +## `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id` +## where: +## - `prunable_entry_id` is the maximum entry id that can be pruned of the region. +## - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic. +## Set to `0` to disable the flush operation. +trigger_flush_threshold = 0 + +## Concurrent task limit for automatically WAL pruning. +auto_prune_parallelism = 10 + ## Number of topics. num_topics = 64 diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 3177a2446f..320a2849ed 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -779,6 +779,8 @@ impl InformationExtension for StandaloneInformationExtension { sst_size: region_stat.sst_size, index_size: region_stat.index_size, region_manifest: region_stat.manifest.into(), + data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, + metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, } }) .collect::>(); diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index 2f45e6bdb9..499ed865a2 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -94,6 +94,13 @@ pub struct RegionStat { pub index_size: u64, /// The manifest infoof the region. pub region_manifest: RegionManifestInfo, + /// The latest entry id of topic used by data. + /// **Only used by remote WAL prune.** + pub data_topic_latest_entry_id: u64, + /// The latest entry id of topic used by metadata. + /// **Only used by remote WAL prune.** + /// In mito engine, this is the same as `data_topic_latest_entry_id`. + pub metadata_topic_latest_entry_id: u64, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] @@ -264,6 +271,8 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { sst_size: region_stat.sst_size, index_size: region_stat.index_size, region_manifest: region_stat.manifest.into(), + data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, + metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, } } } diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index 344e0fef5b..49d97f8a2f 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock}; use common_telemetry::warn; use store_api::storage::RegionId; -use crate::datanode::RegionManifestInfo; +use crate::datanode::{RegionManifestInfo, RegionStat}; /// Represents information about a leader region in the cluster. /// Contains the datanode id where the leader is located, @@ -35,25 +35,22 @@ pub enum LeaderRegionManifestInfo { Mito { manifest_version: u64, flushed_entry_id: u64, + topic_latest_entry_id: u64, }, Metric { data_manifest_version: u64, data_flushed_entry_id: u64, + data_topic_latest_entry_id: u64, metadata_manifest_version: u64, metadata_flushed_entry_id: u64, + metadata_topic_latest_entry_id: u64, }, } -impl From for LeaderRegionManifestInfo { - fn from(value: RegionManifestInfo) -> Self { - match value { - RegionManifestInfo::Mito { - manifest_version, - flushed_entry_id, - } => LeaderRegionManifestInfo::Mito { - manifest_version, - flushed_entry_id, - }, +impl LeaderRegionManifestInfo { + /// Generate a [LeaderRegionManifestInfo] from [RegionStat]. + pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo { + match region_stat.region_manifest { RegionManifestInfo::Metric { data_manifest_version, data_flushed_entry_id, @@ -62,14 +59,22 @@ impl From for LeaderRegionManifestInfo { } => LeaderRegionManifestInfo::Metric { data_manifest_version, data_flushed_entry_id, + data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, metadata_manifest_version, metadata_flushed_entry_id, + metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, + }, + RegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + } => LeaderRegionManifestInfo::Mito { + manifest_version, + flushed_entry_id, + topic_latest_entry_id: region_stat.data_topic_latest_entry_id, }, } } -} -impl LeaderRegionManifestInfo { /// Returns the manifest version of the leader region. pub fn manifest_version(&self) -> u64 { match self { @@ -96,17 +101,33 @@ impl LeaderRegionManifestInfo { } } - /// Returns the minimum flushed entry id of the leader region. - pub fn min_flushed_entry_id(&self) -> u64 { + /// Returns prunable entry id of the leader region. + /// It is used to determine the entry id that can be pruned in remote wal. + /// + /// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush). + /// + /// For a metric region, the prunable entry id should min( + /// max(data_flushed_entry_id, data_latest_entry_id_since_flush), + /// max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush) + /// ). + pub fn prunable_entry_id(&self) -> u64 { match self { LeaderRegionManifestInfo::Mito { flushed_entry_id, .. } => *flushed_entry_id, LeaderRegionManifestInfo::Metric { data_flushed_entry_id, + data_topic_latest_entry_id, metadata_flushed_entry_id, + metadata_topic_latest_entry_id, .. - } => (*data_flushed_entry_id).min(*metadata_flushed_entry_id), + } => { + let data_prunable_entry_id = + (*data_flushed_entry_id).max(*data_topic_latest_entry_id); + let metadata_prunable_entry_id = + (*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id); + data_prunable_entry_id.min(metadata_prunable_entry_id) + } } } } diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index 2aba2a5ee3..a6e1482f04 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -30,7 +30,9 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result}; use crate::key::NAME_PATTERN_REGEX; use crate::kv_backend::KvBackendRef; use crate::leadership_notifier::LeadershipChangeListener; -pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator; +pub use crate::wal_options_allocator::topic_creator::{ + build_kafka_client, build_kafka_topic_creator, +}; use crate::wal_options_allocator::topic_pool::KafkaTopicPool; /// Allocates wal options in region granularity. diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index f49d1bf1ca..1a023546d3 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use common_telemetry::{error, info}; use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; use common_wal::config::kafka::MetasrvKafkaConfig; @@ -34,11 +32,9 @@ use crate::error::{ // The `DEFAULT_PARTITION` refers to the index of the partition. const DEFAULT_PARTITION: i32 = 0; -type KafkaClientRef = Arc; - /// Creates topics in kafka. pub struct KafkaTopicCreator { - client: KafkaClientRef, + client: Client, /// The number of partitions per topic. num_partitions: i32, /// The replication factor of each topic. @@ -48,7 +44,7 @@ pub struct KafkaTopicCreator { } impl KafkaTopicCreator { - pub fn client(&self) -> &KafkaClientRef { + pub fn client(&self) -> &Client { &self.client } @@ -133,7 +129,8 @@ impl KafkaTopicCreator { } } -pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result { +/// Builds a kafka [Client](rskafka::client::Client). +pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result { // Builds an kafka controller client for creating topics. let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints) .await @@ -145,15 +142,19 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result Result { + let client = build_kafka_client(config).await?; Ok(KafkaTopicCreator { - client: Arc::new(client), + client, num_partitions: config.kafka_topic.num_partitions, replication_factor: config.kafka_topic.replication_factor, create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32, diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 831faef772..921c1faaa3 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -15,6 +15,8 @@ pub mod kafka; pub mod raft_engine; +use std::time::Duration; + use serde::{Deserialize, Serialize}; use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; @@ -53,11 +55,32 @@ impl From for MetasrvWalConfig { connection: config.connection, kafka_topic: config.kafka_topic, auto_create_topics: config.auto_create_topics, + auto_prune_interval: config.auto_prune_interval, + trigger_flush_threshold: config.trigger_flush_threshold, + auto_prune_parallelism: config.auto_prune_parallelism, }), } } } +impl MetasrvWalConfig { + /// Returns if active wal pruning is enabled. + pub fn enable_active_wal_pruning(&self) -> bool { + match self { + MetasrvWalConfig::RaftEngine => false, + MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO, + } + } + + /// Gets the kafka connection config. + pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> { + match self { + MetasrvWalConfig::RaftEngine => None, + MetasrvWalConfig::Kafka(config) => Some(config), + } + } +} + impl From for DatanodeWalConfig { fn from(config: MetasrvWalConfig) -> Self { match config { @@ -181,6 +204,9 @@ mod tests { create_topic_timeout: Duration::from_secs(30), }, auto_create_topics: true, + auto_prune_interval: Duration::from_secs(0), + trigger_flush_threshold: 0, + auto_prune_parallelism: 10, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index ea58a3d49e..6b2c9992f4 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -30,6 +30,13 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig { deadline: Some(Duration::from_secs(120)), }; +/// Default interval for active WAL pruning. +pub const DEFAULT_ACTIVE_PRUNE_INTERVAL: Duration = Duration::ZERO; +/// Default limit for concurrent active pruning tasks. +pub const DEFAULT_ACTIVE_PRUNE_TASK_LIMIT: usize = 10; +/// Default interval for sending flush request to regions when pruning remote WAL. +pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0; + use crate::error::{self, Result}; use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX}; diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 77cf05397d..dd659d636e 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -17,7 +17,10 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; +use crate::config::kafka::common::{ + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL, + DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD, +}; /// Kafka wal configurations for datanode. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -44,6 +47,15 @@ pub struct DatanodeKafkaConfig { pub dump_index_interval: Duration, /// Ignore missing entries during read WAL. pub overwrite_entry_start_id: bool, + // Active WAL pruning. + pub auto_prune_topic_records: bool, + // Interval of WAL pruning. + pub auto_prune_interval: Duration, + // Threshold for sending flush request when pruning remote WAL. + // `None` stands for never sending flush request. + pub trigger_flush_threshold: u64, + // Limit of concurrent active pruning procedures. + pub auto_prune_parallelism: usize, } impl Default for DatanodeKafkaConfig { @@ -58,6 +70,10 @@ impl Default for DatanodeKafkaConfig { create_index: true, dump_index_interval: Duration::from_secs(60), overwrite_entry_start_id: false, + auto_prune_topic_records: false, + auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL, + trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD, + auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, } } } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index 27df3569b8..acbfbe05c6 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use serde::{Deserialize, Serialize}; -use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; +use crate::config::kafka::common::{ + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL, + DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD, +}; /// Kafka wal configurations for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -28,6 +33,13 @@ pub struct MetasrvKafkaConfig { pub kafka_topic: KafkaTopicConfig, // Automatically create topics for WAL. pub auto_create_topics: bool, + // Interval of WAL pruning. + pub auto_prune_interval: Duration, + // Threshold for sending flush request when pruning remote WAL. + // `None` stands for never sending flush request. + pub trigger_flush_threshold: u64, + // Limit of concurrent active pruning procedures. + pub auto_prune_parallelism: usize, } impl Default for MetasrvKafkaConfig { @@ -36,6 +48,9 @@ impl Default for MetasrvKafkaConfig { connection: Default::default(), kafka_topic: Default::default(), auto_create_topics: true, + auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL, + trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD, + auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, } } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7c45fa408f..bd249c5178 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -518,6 +518,13 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display("A prune task for topic {} is already running", topic))] + PruneTaskAlreadyRunning { + topic: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Schema already exists, name: {schema_name}"))] SchemaAlreadyExists { schema_name: String, @@ -788,6 +795,14 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to build kafka client."))] + BuildKafkaClient { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: common_meta::error::Error, + }, + #[snafu(display( "Failed to build a Kafka partition client, topic: {}, partition: {}", topic, @@ -875,7 +890,9 @@ impl ErrorExt for Error { | Error::FlowStateHandler { .. } | Error::BuildWalOptionsAllocator { .. } | Error::BuildPartitionClient { .. } - | Error::DeleteRecords { .. } => StatusCode::Internal, + | Error::BuildKafkaClient { .. } + | Error::DeleteRecords { .. } + | Error::PruneTaskAlreadyRunning { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index fd5fab3639..13aee5d234 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::meta::{HeartbeatRequest, Role}; -use common_meta::region_registry::LeaderRegion; +use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo}; use store_api::region_engine::RegionRole; use crate::error::Result; @@ -44,7 +44,7 @@ impl HeartbeatHandler for CollectLeaderRegionHandler { continue; } - let manifest = stat.region_manifest.into(); + let manifest = LeaderRegionManifestInfo::from_region_stat(stat); let value = LeaderRegion { datanode_id: current_stat.id, manifest, @@ -122,6 +122,8 @@ mod tests { manifest_size: 0, sst_size: 0, index_size: 0, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, } } @@ -161,6 +163,7 @@ mod tests { manifest: LeaderRegionManifestInfo::Mito { manifest_version: 1, flushed_entry_id: 0, + topic_latest_entry_id: 0, }, }) ); @@ -192,6 +195,7 @@ mod tests { manifest: LeaderRegionManifestInfo::Mito { manifest_version: 2, flushed_entry_id: 0, + topic_latest_entry_id: 0, }, }) ); @@ -224,6 +228,7 @@ mod tests { manifest: LeaderRegionManifestInfo::Mito { manifest_version: 2, flushed_entry_id: 0, + topic_latest_entry_id: 0, }, }) ); diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 81e9bfaeb9..cdcd9d3228 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -102,6 +102,8 @@ mod tests { manifest_version: 0, flushed_entry_id: 0, }, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index b89a570b80..d900078897 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -160,6 +160,8 @@ mod test { manifest_version: 0, flushed_entry_id: 0, }, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 9a9f0861a8..4b61eeeae3 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -15,6 +15,7 @@ #![feature(result_flattening)] #![feature(assert_matches)] #![feature(extract_if)] +#![feature(hash_set_entry)] pub mod bootstrap; pub mod cache_invalidator; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 00ac628b61..34b3cac25e 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -61,6 +61,7 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef}; use crate::lease::lookup_datanode_peer; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; +use crate::procedure::wal_prune::manager::WalPruneTickerRef; use crate::procedure::ProcedureManagerListenerAdapter; use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; use crate::region::supervisor::RegionSupervisorTickerRef; @@ -407,6 +408,7 @@ pub struct Metasrv { region_supervisor_ticker: Option, cache_invalidator: CacheInvalidatorRef, leader_region_registry: LeaderRegionRegistryRef, + wal_prune_ticker: Option, plugins: Plugins, } @@ -461,6 +463,9 @@ impl Metasrv { if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker { leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _); } + if let Some(wal_prune_ticker) = &self.wal_prune_ticker { + leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _); + } if let Some(customizer) = self.plugins.get::() { customizer.customize(&mut leadership_change_notifier); } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0f1a04e47b..ec8f6ef253 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -37,7 +37,7 @@ use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; -use common_meta::wal_options_allocator::build_wal_options_allocator; +use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; @@ -58,6 +58,8 @@ use crate::metasrv::{ }; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; +use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker}; +use crate::procedure::wal_prune::Context as WalPruneContext; use crate::region::supervisor::{ HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker, DEFAULT_TICK_INTERVAL, @@ -346,6 +348,40 @@ impl MetasrvBuilder { .context(error::InitDdlManagerSnafu)?, ); + // remote WAL prune ticker and manager + let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() { + let (tx, rx) = WalPruneManager::channel(); + // Safety: Must be remote WAL. + let remote_wal_options = options.wal.remote_wal_options().unwrap(); + let kafka_client = build_kafka_client(remote_wal_options) + .await + .context(error::BuildKafkaClientSnafu)?; + let wal_prune_context = WalPruneContext { + client: Arc::new(kafka_client), + table_metadata_manager: table_metadata_manager.clone(), + leader_region_registry: leader_region_registry.clone(), + server_addr: options.server_addr.clone(), + mailbox: mailbox.clone(), + }; + let wal_prune_manager = WalPruneManager::new( + table_metadata_manager.clone(), + remote_wal_options.auto_prune_parallelism, + rx, + procedure_manager.clone(), + wal_prune_context, + remote_wal_options.trigger_flush_threshold, + ); + // Start manager in background. Ticker will be started in the main thread to send ticks. + wal_prune_manager.try_start().await?; + let wal_prune_ticker = Arc::new(WalPruneTicker::new( + remote_wal_options.auto_prune_interval, + tx.clone(), + )); + Some(wal_prune_ticker) + } else { + None + }; + let customized_region_lease_renewer = plugins .as_ref() .and_then(|plugins| plugins.get::()); @@ -406,6 +442,7 @@ impl MetasrvBuilder { region_supervisor_ticker, cache_invalidator, leader_region_registry, + wal_prune_ticker, }) } } diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index c5e2d3df4d..a9750ae5f3 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -66,4 +66,7 @@ lazy_static! { // The heartbeat rate counter. pub static ref METRIC_META_HEARTBEAT_RATE: IntCounter = register_int_counter!("greptime_meta_heartbeat_rate", "meta heartbeat arrival rate").unwrap(); + /// The remote WAL prune execute counter. + pub static ref METRIC_META_REMOTE_WAL_PRUNE_EXECUTE: IntCounterVec = + register_int_counter_vec!("greptime_meta_remote_wal_prune_execute", "meta remote wal prune execute", &["topic_name"]).unwrap(); } diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 34ce23abd4..ca6da59f2a 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -179,8 +179,8 @@ pub async fn new_wal_prune_metadata( ) -> (EntryId, Vec) { let datanode_id = 1; let from_peer = Peer::empty(datanode_id); - let mut min_flushed_entry_id = u64::MAX; - let mut max_flushed_entry_id = 0; + let mut min_prunable_entry_id = u64::MAX; + let mut max_prunable_entry_id = 0; let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize); for table_id in 0..n_table { let region_ids = (0..n_region) @@ -221,10 +221,10 @@ pub async fn new_wal_prune_metadata( .iter() .map(|region_id| { let rand_n = rand::random::() as usize; - let current_flushed_entry_id = offsets[rand_n % offsets.len()] as u64; - min_flushed_entry_id = min_flushed_entry_id.min(current_flushed_entry_id); - max_flushed_entry_id = max_flushed_entry_id.max(current_flushed_entry_id); - (*region_id, current_flushed_entry_id) + let current_prunable_entry_id = offsets[rand_n % offsets.len()] as u64; + min_prunable_entry_id = min_prunable_entry_id.min(current_prunable_entry_id); + max_prunable_entry_id = max_prunable_entry_id.max(current_prunable_entry_id); + (*region_id, current_prunable_entry_id) }) .collect::>(); region_entry_ids.extend(current_region_entry_ids.clone()); @@ -235,15 +235,15 @@ pub async fn new_wal_prune_metadata( let regions_to_flush = region_entry_ids .iter() - .filter_map(|(region_id, flushed_entry_id)| { - if max_flushed_entry_id - flushed_entry_id > threshold { + .filter_map(|(region_id, prunable_entry_id)| { + if max_prunable_entry_id - prunable_entry_id > threshold { Some(*region_id) } else { None } }) .collect::>(); - (min_flushed_entry_id, regions_to_flush) + (min_prunable_entry_id, regions_to_flush) } pub async fn update_in_memory_region_flushed_entry_id( @@ -257,6 +257,7 @@ pub async fn update_in_memory_region_flushed_entry_id( manifest: LeaderRegionManifestInfo::Mito { manifest_version: 0, flushed_entry_id, + topic_latest_entry_id: 0, }, }; key_values.push((region_id, value)); diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index b7f145fffc..0247c928ec 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod manager; +#[cfg(test)] +mod test_util; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -28,9 +32,10 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, StringKey, }; -use common_telemetry::warn; +use common_telemetry::{info, warn}; use itertools::{Itertools, MinMaxResult}; use log_store::kafka::DEFAULT_PARTITION; +use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker}; use rskafka::client::partition::UnknownTopicHandling; use rskafka::client::Client; use serde::{Deserialize, Serialize}; @@ -45,9 +50,8 @@ use crate::error::{ use crate::service::mailbox::{Channel, MailboxRef}; use crate::Result; -type KafkaClientRef = Arc; +pub type KafkaClientRef = Arc; -/// No timeout for flush request. const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1); /// The state of WAL pruning. @@ -58,17 +62,18 @@ pub enum WalPruneState { Prune, } +#[derive(Clone)] pub struct Context { /// The Kafka client. - client: KafkaClientRef, + pub client: KafkaClientRef, /// The table metadata manager. - table_metadata_manager: TableMetadataManagerRef, + pub table_metadata_manager: TableMetadataManagerRef, /// The leader region registry. - leader_region_registry: LeaderRegionRegistryRef, + pub leader_region_registry: LeaderRegionRegistryRef, /// Server address of metasrv. - server_addr: String, + pub server_addr: String, /// The mailbox to send messages. - mailbox: MailboxRef, + pub mailbox: MailboxRef, } /// The data of WAL pruning. @@ -77,10 +82,11 @@ pub struct WalPruneData { /// The topic name to prune. pub topic: String, /// The minimum flush entry id for topic, which is used to prune the WAL. - pub min_flushed_entry_id: EntryId, + pub prunable_entry_id: EntryId, pub regions_to_flush: Vec, - /// If `flushed_entry_id` + `trigger_flush_threshold` < `max_flushed_entry_id`, send a flush request to the region. - pub trigger_flush_threshold: Option, + /// If `prunable_entry_id` + `trigger_flush_threshold` < `max_prunable_entry_id`, send a flush request to the region. + /// If `None`, never send flush requests. + pub trigger_flush_threshold: u64, /// The state. pub state: WalPruneState, } @@ -89,27 +95,43 @@ pub struct WalPruneData { pub struct WalPruneProcedure { pub data: WalPruneData, pub context: Context, + pub _guard: Option, } impl WalPruneProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune"; - pub fn new(topic: String, context: Context, trigger_flush_threshold: Option) -> Self { + pub fn new( + topic: String, + context: Context, + trigger_flush_threshold: u64, + guard: Option, + ) -> Self { Self { data: WalPruneData { topic, - min_flushed_entry_id: 0, + prunable_entry_id: 0, trigger_flush_threshold, regions_to_flush: vec![], state: WalPruneState::Prepare, }, context, + _guard: guard, } } - pub fn from_json(json: &str, context: Context) -> ProcedureResult { + pub fn from_json( + json: &str, + context: &Context, + tracker: WalPruneProcedureTracker, + ) -> ProcedureResult { let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?; - Ok(Self { data, context }) + let guard = tracker.insert_running_procedure(data.topic.clone()); + Ok(Self { + data, + context: context.clone(), + _guard: guard, + }) } async fn build_peer_to_region_ids_map( @@ -182,20 +204,20 @@ impl WalPruneProcedure { .with_context(|_| error::RetryLaterWithSourceSnafu { reason: "Failed to get topic-region map", })?; - let flush_entry_ids_map: HashMap<_, _> = self + let prunable_entry_ids_map: HashMap<_, _> = self .context .leader_region_registry .batch_get(region_ids.iter().cloned()) .into_iter() .map(|(region_id, region)| { - let flushed_entry_id = region.manifest.min_flushed_entry_id(); - (region_id, flushed_entry_id) + let prunable_entry_id = region.manifest.prunable_entry_id(); + (region_id, prunable_entry_id) }) .collect(); - // Check if the `flush_entry_ids_map` contains all region ids. + // Check if the `prunable_entry_ids_map` contains all region ids. let non_collected_region_ids = - check_heartbeat_collected_region_ids(®ion_ids, &flush_entry_ids_map); + check_heartbeat_collected_region_ids(®ion_ids, &prunable_entry_ids_map); if !non_collected_region_ids.is_empty() { // The heartbeat collected region ids do not contain all region ids in the topic-region map. // In this case, we should not prune the WAL. @@ -204,23 +226,23 @@ impl WalPruneProcedure { return Ok(Status::done()); } - let min_max_result = flush_entry_ids_map.values().minmax(); - let max_flushed_entry_id = match min_max_result { + let min_max_result = prunable_entry_ids_map.values().minmax(); + let max_prunable_entry_id = match min_max_result { MinMaxResult::NoElements => { return Ok(Status::done()); } - MinMaxResult::OneElement(flushed_entry_id) => { - self.data.min_flushed_entry_id = *flushed_entry_id; - *flushed_entry_id + MinMaxResult::OneElement(prunable_entry_id) => { + self.data.prunable_entry_id = *prunable_entry_id; + *prunable_entry_id } - MinMaxResult::MinMax(min_flushed_entry_id, max_flushed_entry_id) => { - self.data.min_flushed_entry_id = *min_flushed_entry_id; - *max_flushed_entry_id + MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => { + self.data.prunable_entry_id = *min_prunable_entry_id; + *max_prunable_entry_id } }; - if let Some(threshold) = self.data.trigger_flush_threshold { - for (region_id, flushed_entry_id) in flush_entry_ids_map { - if flushed_entry_id + threshold < max_flushed_entry_id { + if self.data.trigger_flush_threshold != 0 { + for (region_id, prunable_entry_id) in prunable_entry_ids_map { + if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id { self.data.regions_to_flush.push(region_id); } } @@ -232,10 +254,17 @@ impl WalPruneProcedure { } /// Send the flush request to regions with low flush entry id. + /// + /// Retry: + /// - Failed to build peer to region ids map. It means failure in retrieving metadata. pub async fn on_sending_flush_request(&mut self) -> Result { let peer_to_region_ids_map = self .build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush) - .await?; + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: "Failed to build peer to region ids map", + })?; let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?; for (peer, flush_instruction) in flush_instructions.into_iter() { let msg = MailboxMessage::json_message( @@ -255,13 +284,13 @@ impl WalPruneProcedure { Ok(Status::executing(true)) } - /// Prune the WAL and persist the minimum flushed entry id. + /// Prune the WAL and persist the minimum prunable entry id. /// /// Retry: - /// - Failed to update the minimum flushed entry id in kvbackend. + /// - Failed to update the minimum prunable entry id in kvbackend. /// - Failed to delete records. pub async fn on_prune(&mut self) -> Result { - // Safety: flushed_entry_ids are loaded in on_prepare. + // Safety: `prunable_entry_id`` are loaded in on_prepare. let partition_client = self .context .client @@ -276,7 +305,7 @@ impl WalPruneProcedure { partition: DEFAULT_PARTITION, })?; - // Should update the min flushed entry id in the kv backend before deleting records. + // Should update the min prunable entry id in the kv backend before deleting records. // Otherwise, when a datanode restarts, it will not be able to find the wal entries. let prev = self .context @@ -292,7 +321,7 @@ impl WalPruneProcedure { self.context .table_metadata_manager .topic_name_manager() - .update(&self.data.topic, self.data.min_flushed_entry_id, prev) + .update(&self.data.topic, self.data.prunable_entry_id, prev) .await .context(UpdateTopicNameValueSnafu { topic: &self.data.topic, @@ -306,14 +335,14 @@ impl WalPruneProcedure { })?; partition_client .delete_records( - (self.data.min_flushed_entry_id + 1) as i64, + (self.data.prunable_entry_id + 1) as i64, DELETE_RECORDS_TIMEOUT.as_millis() as i32, ) .await .context(DeleteRecordsSnafu { topic: &self.data.topic, partition: DEFAULT_PARTITION, - offset: (self.data.min_flushed_entry_id + 1), + offset: (self.data.prunable_entry_id + 1), }) .map_err(BoxedError::new) .with_context(|_| error::RetryLaterWithSourceSnafu { @@ -321,9 +350,13 @@ impl WalPruneProcedure { "Failed to delete records for topic: {}, partition: {}, offset: {}", self.data.topic, DEFAULT_PARTITION, - self.data.min_flushed_entry_id + 1 + self.data.prunable_entry_id + 1 ), })?; + info!( + "Successfully pruned WAL for topic: {}, entry id: {}", + self.data.topic, self.data.prunable_entry_id + ); Ok(Status::done()) } } @@ -388,123 +421,41 @@ mod tests { use std::assert_matches::assert_matches; use api::v1::meta::HeartbeatResponse; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::region_registry::LeaderRegionRegistry; - use common_meta::sequence::SequenceBuilder; - use common_meta::wal_options_allocator::build_kafka_topic_creator; - use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; - use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::test_util::run_test_with_kafka_wal; use rskafka::record::Record; use tokio::sync::mpsc::Receiver; use super::*; use crate::handler::HeartbeatMailbox; - use crate::procedure::test_util::{new_wal_prune_metadata, MailboxContext}; - - struct TestEnv { - table_metadata_manager: TableMetadataManagerRef, - leader_region_registry: LeaderRegionRegistryRef, - mailbox: MailboxContext, - server_addr: String, - } - - impl TestEnv { - fn new() -> Self { - let kv_backend = Arc::new(MemoryKvBackend::new()); - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - let leader_region_registry = Arc::new(LeaderRegionRegistry::new()); - let mailbox_sequence = - SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); - - let mailbox_ctx = MailboxContext::new(mailbox_sequence); - - Self { - table_metadata_manager, - leader_region_registry, - mailbox: mailbox_ctx, - server_addr: "localhost".to_string(), - } - } - - fn table_metadata_manager(&self) -> &TableMetadataManagerRef { - &self.table_metadata_manager - } - - fn leader_region_registry(&self) -> &LeaderRegionRegistryRef { - &self.leader_region_registry - } - - fn mailbox_context(&self) -> &MailboxContext { - &self.mailbox - } - - fn server_addr(&self) -> &str { - &self.server_addr - } - } + use crate::procedure::test_util::new_wal_prune_metadata; + // Fix this import to correctly point to the test_util module + use crate::procedure::wal_prune::test_util::TestEnv; /// Mock a test env for testing. /// Including: /// 1. Prepare some data in the table metadata manager and in-memory kv backend. - /// 2. Generate a `WalPruneProcedure` with the test env. - /// 3. Return the procedure, the minimum last entry id to prune and the regions to flush. - async fn mock_test_env( - topic: String, - broker_endpoints: Vec, - env: &TestEnv, - ) -> (WalPruneProcedure, u64, Vec) { - // Creates a topic manager. - let kafka_topic = KafkaTopicConfig { - replication_factor: broker_endpoints.len() as i16, - ..Default::default() - }; - let config = MetasrvKafkaConfig { - connection: KafkaConnectionConfig { - broker_endpoints, - ..Default::default() - }, - kafka_topic, - ..Default::default() - }; - let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); - let table_metadata_manager = env.table_metadata_manager().clone(); - let leader_region_registry = env.leader_region_registry().clone(); - let mailbox = env.mailbox_context().mailbox().clone(); - + /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush. + async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec) { let n_region = 10; let n_table = 5; - let threshold = 10; // 5 entries per region. let offsets = mock_wal_entries( - topic_creator.client().clone(), - &topic, + procedure.context.client.clone(), + &procedure.data.topic, (n_region * n_table * 5) as usize, ) .await; - - let (min_flushed_entry_id, regions_to_flush) = new_wal_prune_metadata( - table_metadata_manager.clone(), - leader_region_registry.clone(), + let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata( + procedure.context.table_metadata_manager.clone(), + procedure.context.leader_region_registry.clone(), n_region, n_table, &offsets, - threshold, - topic.clone(), + procedure.data.trigger_flush_threshold, + procedure.data.topic.clone(), ) .await; - - let context = Context { - client: topic_creator.client().clone(), - table_metadata_manager, - leader_region_registry, - mailbox, - server_addr: env.server_addr().to_string(), - }; - - let wal_prune_procedure = WalPruneProcedure::new(topic, context, Some(threshold)); - (wal_prune_procedure, min_flushed_entry_id, regions_to_flush) + (prunable_entry_id, regions_to_flush) } fn record(i: usize) -> Record { @@ -603,10 +554,18 @@ mod tests { run_test_with_kafka_wal(|broker_endpoints| { Box::pin(async { common_telemetry::init_default_ut_logging(); - let topic_name = "greptime_test_topic".to_string(); + let mut topic_name = uuid::Uuid::new_v4().to_string(); + // Topic should start with a letter. + topic_name = format!("test_procedure_execution-{}", topic_name); let mut env = TestEnv::new(); - let (mut procedure, min_flushed_entry_id, regions_to_flush) = - mock_test_env(topic_name.clone(), broker_endpoints, &env).await; + let context = env.build_wal_prune_context(broker_endpoints).await; + let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None); + + // Before any data in kvbackend is mocked, should return a retryable error. + let result = procedure.on_prune().await; + assert_matches!(result, Err(e) if e.is_retryable()); + + let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await; // Step 1: Test `on_prepare`. let status = procedure.on_prepare().await.unwrap(); @@ -618,7 +577,7 @@ mod tests { } ); assert_matches!(procedure.data.state, WalPruneState::FlushRegion); - assert_eq!(procedure.data.min_flushed_entry_id, min_flushed_entry_id); + assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id); assert_eq!( procedure.data.regions_to_flush.len(), regions_to_flush.len() @@ -646,34 +605,31 @@ mod tests { // Step 3: Test `on_prune`. let status = procedure.on_prune().await.unwrap(); assert_matches!(status, Status::Done { output: None }); - // Check if the entry ids after `min_flushed_entry_id` still exist. + // Check if the entry ids after `prunable_entry_id` still exist. check_entry_id_existence( procedure.context.client.clone(), &topic_name, - procedure.data.min_flushed_entry_id as i64 + 1, + procedure.data.prunable_entry_id as i64 + 1, true, ) .await; - // Check if the entry s before `min_flushed_entry_id` are deleted. + // Check if the entry s before `prunable_entry_id` are deleted. check_entry_id_existence( procedure.context.client.clone(), &topic_name, - procedure.data.min_flushed_entry_id as i64, + procedure.data.prunable_entry_id as i64, false, ) .await; - let min_entry_id = env - .table_metadata_manager() + let value = env + .table_metadata_manager .topic_name_manager() .get(&topic_name) .await .unwrap() .unwrap(); - assert_eq!( - min_entry_id.pruned_entry_id, - procedure.data.min_flushed_entry_id - ); + assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); // Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails. // Should log a warning and return `Status::Done`. @@ -682,13 +638,10 @@ mod tests { assert_matches!(status, Status::Done { output: None }); // Step 5: Test `on_prepare`, don't flush regions. - procedure.data.trigger_flush_threshold = None; + procedure.data.trigger_flush_threshold = 0; procedure.on_prepare().await.unwrap(); assert_matches!(procedure.data.state, WalPruneState::Prune); - assert_eq!( - min_entry_id.pruned_entry_id, - procedure.data.min_flushed_entry_id - ); + assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); // Clean up the topic. delete_topic(procedure.context.client, &topic_name).await; diff --git a/src/meta-srv/src/procedure/wal_prune/manager.rs b/src/meta-srv/src/procedure/wal_prune/manager.rs new file mode 100644 index 0000000000..8e5072ad11 --- /dev/null +++ b/src/meta-srv/src/procedure/wal_prune/manager.rs @@ -0,0 +1,438 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_set::Entry; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; + +use common_meta::key::TableMetadataManagerRef; +use common_meta::leadership_notifier::LeadershipChangeListener; +use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; +use common_runtime::JoinHandle; +use common_telemetry::{error, info, warn}; +use futures::future::join_all; +use snafu::{OptionExt, ResultExt}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::Semaphore; +use tokio::time::{interval_at, Instant, MissedTickBehavior}; + +use crate::error::{self, Result}; +use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE; +use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure}; + +pub type WalPruneTickerRef = Arc; + +/// Tracks running [WalPruneProcedure]s and the resources they hold. +/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures. +/// +/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework. +#[derive(Clone)] +pub struct WalPruneProcedureTracker { + running_procedures: Arc>>, +} + +impl WalPruneProcedureTracker { + /// Insert a running [WalPruneProcedure] for the given topic name and + /// consume acquire a semaphore permit for the given topic name. + pub fn insert_running_procedure(&self, topic_name: String) -> Option { + let mut running_procedures = self.running_procedures.write().unwrap(); + match running_procedures.entry(topic_name.clone()) { + Entry::Occupied(_) => None, + Entry::Vacant(entry) => { + entry.insert(); + Some(WalPruneProcedureGuard { + topic_name, + running_procedures: self.running_procedures.clone(), + }) + } + } + } + + /// Number of running [WalPruneProcedure]s. + pub fn len(&self) -> usize { + self.running_procedures.read().unwrap().len() + } +} + +/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure]. +/// It is used to track the running [WalPruneProcedure]s. +/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore. +pub struct WalPruneProcedureGuard { + topic_name: String, + running_procedures: Arc>>, +} + +impl Drop for WalPruneProcedureGuard { + fn drop(&mut self) { + let mut running_procedures = self.running_procedures.write().unwrap(); + running_procedures.remove(&self.topic_name); + } +} + +/// Event is used to notify the [WalPruneManager] to do some work. +/// +/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL. +pub enum Event { + Tick, +} + +impl Debug for Event { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Event::Tick => write!(f, "Tick"), + } + } +} + +/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager]. +/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s. +pub(crate) struct WalPruneTicker { + /// Handle of ticker thread. + pub(crate) tick_handle: Mutex>>, + /// The interval of tick. + pub(crate) tick_interval: Duration, + /// Sends [Event]s. + pub(crate) sender: Sender, +} + +#[async_trait::async_trait] +impl LeadershipChangeListener for WalPruneTicker { + fn name(&self) -> &'static str { + "WalPruneTicker" + } + + async fn on_leader_start(&self) -> common_meta::error::Result<()> { + self.start(); + Ok(()) + } + + async fn on_leader_stop(&self) -> common_meta::error::Result<()> { + self.stop(); + Ok(()) + } +} + +/// TODO(CookiePie): Similar to [RegionSupervisorTicker], maybe can refactor to a unified framework. +impl WalPruneTicker { + pub(crate) fn new(tick_interval: Duration, sender: Sender) -> Self { + Self { + tick_handle: Mutex::new(None), + tick_interval, + sender, + } + } + + /// Starts the ticker. + pub fn start(&self) { + let mut handle = self.tick_handle.lock().unwrap(); + if handle.is_none() { + let sender = self.sender.clone(); + let tick_interval = self.tick_interval; + let ticker_loop = tokio::spawn(async move { + let mut interval = interval_at(Instant::now() + tick_interval, tick_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + if sender.send(Event::Tick).await.is_err() { + info!("EventReceiver is dropped, tick loop is stopped"); + break; + } + } + }); + *handle = Some(ticker_loop); + } + info!("WalPruneTicker started."); + } + + /// Stops the ticker. + pub fn stop(&self) { + let mut handle = self.tick_handle.lock().unwrap(); + if let Some(handle) = handle.take() { + handle.abort(); + } + info!("WalPruneTicker stopped."); + } +} + +impl Drop for WalPruneTicker { + fn drop(&mut self) { + self.stop(); + } +} + +/// [WalPruneManager] manages all remote WAL related tasks in metasrv. +/// +/// [WalPruneManager] is responsible for: +/// 1. Registering [WalPruneProcedure] loader in the procedure manager. +/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL. +/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s. +pub(crate) struct WalPruneManager { + /// Table metadata manager to restore topics from kvbackend. + table_metadata_manager: TableMetadataManagerRef, + /// Receives [Event]s. + receiver: Receiver, + /// Procedure manager. + procedure_manager: ProcedureManagerRef, + /// Tracker for running [WalPruneProcedure]s. + tracker: WalPruneProcedureTracker, + /// Semaphore to limit the number of concurrent [WalPruneProcedure]s. + semaphore: Arc, + + /// Context for [WalPruneProcedure]. + wal_prune_context: WalPruneContext, + /// Trigger flush threshold for [WalPruneProcedure]. + /// If `None`, never send flush requests. + trigger_flush_threshold: u64, +} + +impl WalPruneManager { + /// Returns a new empty [WalPruneManager]. + pub fn new( + table_metadata_manager: TableMetadataManagerRef, + limit: usize, + receiver: Receiver, + procedure_manager: ProcedureManagerRef, + wal_prune_context: WalPruneContext, + trigger_flush_threshold: u64, + ) -> Self { + Self { + table_metadata_manager, + receiver, + procedure_manager, + wal_prune_context, + tracker: WalPruneProcedureTracker { + running_procedures: Arc::new(RwLock::new(HashSet::new())), + }, + semaphore: Arc::new(Semaphore::new(limit)), + trigger_flush_threshold, + } + } + + /// Start the [WalPruneManager]. It will register [WalPruneProcedure] loader in the procedure manager. + pub async fn try_start(mut self) -> Result<()> { + let context = self.wal_prune_context.clone(); + let tracker = self.tracker.clone(); + self.procedure_manager + .register_loader( + WalPruneProcedure::TYPE_NAME, + Box::new(move |json| { + let tracker = tracker.clone(); + WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: WalPruneProcedure::TYPE_NAME, + })?; + common_runtime::spawn_global(async move { + self.run().await; + }); + info!("WalPruneProcedureManager Started."); + Ok(()) + } + + /// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages. + pub(crate) fn channel() -> (Sender, Receiver) { + tokio::sync::mpsc::channel(1024) + } + + /// Runs the main loop. Performs actions on received events. + /// + /// - `Tick`: Submit `limit` [WalPruneProcedure]s to prune remote WAL. + pub(crate) async fn run(&mut self) { + while let Some(event) = self.receiver.recv().await { + match event { + Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| { + error!(e; "Failed to handle tick request"); + }), + } + } + } + + /// Submits a [WalPruneProcedure] for the given topic name. + pub async fn submit_procedure(&self, topic_name: &str) -> Result { + let guard = self + .tracker + .insert_running_procedure(topic_name.to_string()) + .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?; + + let procedure = WalPruneProcedure::new( + topic_name.to_string(), + self.wal_prune_context.clone(), + self.trigger_flush_threshold, + Some(guard), + ); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + METRIC_META_REMOTE_WAL_PRUNE_EXECUTE + .with_label_values(&[topic_name]) + .inc(); + let procedure_manager = self.procedure_manager.clone(); + let mut watcher = procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu)?; + watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu)?; + + Ok(procedure_id) + } + + async fn handle_tick_request(&self) -> Result<()> { + let topics = self.retrieve_sorted_topics().await?; + let mut tasks = Vec::with_capacity(topics.len()); + for topic_name in topics.iter() { + tasks.push(async { + let _permit = self.semaphore.acquire().await.unwrap(); + match self.submit_procedure(topic_name).await { + Ok(_) => {} + Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => { + warn!("Prune task for topic {} is already running", topic); + } + Err(e) => { + error!( + "Failed to submit prune task for topic {}: {}", + topic_name.clone(), + e + ); + } + } + }); + } + + join_all(tasks).await; + Ok(()) + } + + /// Retrieve topics from the table metadata manager. + /// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics. + /// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time. + async fn retrieve_sorted_topics(&self) -> Result> { + self.table_metadata_manager + .topic_name_manager() + .range() + .await + .context(error::TableMetadataManagerSnafu) + } +} + +#[cfg(test)] +mod test { + use std::assert_matches::assert_matches; + + use common_meta::key::topic_name::TopicNameKey; + use common_wal::test_util::run_test_with_kafka_wal; + use tokio::time::{sleep, timeout}; + + use super::*; + use crate::procedure::wal_prune::test_util::TestEnv; + + #[tokio::test] + async fn test_wal_prune_ticker() { + let (tx, mut rx) = WalPruneManager::channel(); + let interval = Duration::from_millis(10); + let ticker = WalPruneTicker::new(interval, tx); + assert_eq!(ticker.name(), "WalPruneTicker"); + + for _ in 0..2 { + ticker.start(); + sleep(2 * interval).await; + assert!(!rx.is_empty()); + while let Ok(event) = rx.try_recv() { + assert_matches!(event, Event::Tick); + } + } + ticker.stop(); + } + + #[tokio::test] + async fn test_wal_prune_tracker_and_guard() { + let tracker = WalPruneProcedureTracker { + running_procedures: Arc::new(RwLock::new(HashSet::new())), + }; + let topic_name = uuid::Uuid::new_v4().to_string(); + { + let guard = tracker + .insert_running_procedure(topic_name.clone()) + .unwrap(); + assert_eq!(guard.topic_name, topic_name); + assert_eq!(guard.running_procedures.read().unwrap().len(), 1); + + let result = tracker.insert_running_procedure(topic_name.clone()); + assert!(result.is_none()); + } + assert_eq!(tracker.running_procedures.read().unwrap().len(), 0); + } + + async fn mock_wal_prune_manager( + broker_endpoints: Vec, + limit: usize, + ) -> (Sender, WalPruneManager) { + let test_env = TestEnv::new(); + let (tx, rx) = WalPruneManager::channel(); + let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await; + ( + tx, + WalPruneManager::new( + test_env.table_metadata_manager.clone(), + limit, + rx, + test_env.procedure_manager.clone(), + wal_prune_context, + 0, + ), + ) + } + + async fn mock_topics(manager: &WalPruneManager, topics: &[String]) { + let topic_name_keys = topics + .iter() + .map(|topic| TopicNameKey::new(topic)) + .collect::>(); + manager + .table_metadata_manager + .topic_name_manager() + .batch_put(topic_name_keys) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_wal_prune_manager() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + let limit = 6; + let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await; + let topics = (0..limit * 2) + .map(|_| uuid::Uuid::new_v4().to_string()) + .collect::>(); + mock_topics(&manager, &topics).await; + + let tracker = manager.tracker.clone(); + let handler = + common_runtime::spawn_global(async move { manager.try_start().await.unwrap() }); + handler.await.unwrap(); + + tx.send(Event::Tick).await.unwrap(); + // Wait for at least one procedure to be submitted. + timeout(Duration::from_millis(100), async move { tracker.len() > 0 }) + .await + .unwrap(); + }) + }) + .await; + } +} diff --git a/src/meta-srv/src/procedure/wal_prune/test_util.rs b/src/meta-srv/src/procedure/wal_prune/test_util.rs new file mode 100644 index 0000000000..b7cdbad286 --- /dev/null +++ b/src/meta-srv/src/procedure/wal_prune/test_util.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef}; +use common_meta::sequence::SequenceBuilder; +use common_meta::state_store::KvStateStore; +use common_meta::wal_options_allocator::build_kafka_client; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::test_util::InMemoryPoisonStore; +use common_procedure::ProcedureManagerRef; +use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; +use common_wal::config::kafka::MetasrvKafkaConfig; +use rskafka::client::Client; + +use crate::procedure::test_util::MailboxContext; +use crate::procedure::wal_prune::Context as WalPruneContext; + +pub struct TestEnv { + pub table_metadata_manager: TableMetadataManagerRef, + pub leader_region_registry: LeaderRegionRegistryRef, + pub procedure_manager: ProcedureManagerRef, + pub mailbox: MailboxContext, + pub server_addr: String, +} + +impl TestEnv { + pub fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let leader_region_registry = Arc::new(LeaderRegionRegistry::new()); + let mailbox_sequence = + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); + + let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + let procedure_manager = Arc::new(LocalManager::new( + ManagerConfig::default(), + state_store, + poison_manager, + )); + + let mailbox_ctx = MailboxContext::new(mailbox_sequence); + + Self { + table_metadata_manager, + leader_region_registry, + procedure_manager, + mailbox: mailbox_ctx, + server_addr: "localhost".to_string(), + } + } + + async fn build_kafka_client(broker_endpoints: Vec) -> Arc { + let kafka_topic = KafkaTopicConfig { + replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, + kafka_topic, + ..Default::default() + }; + Arc::new(build_kafka_client(&config).await.unwrap()) + } + + pub async fn build_wal_prune_context(&self, broker_endpoints: Vec) -> WalPruneContext { + let client = Self::build_kafka_client(broker_endpoints).await; + WalPruneContext { + client, + table_metadata_manager: self.table_metadata_manager.clone(), + leader_region_registry: self.leader_region_registry.clone(), + server_addr: self.server_addr.to_string(), + mailbox: self.mailbox.mailbox().clone(), + } + } +} diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index eb35f43f19..904fc1f4e7 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -195,6 +195,8 @@ mod tests { manifest_version: 0, flushed_entry_id: 0, }, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, }], ..Default::default() } @@ -220,6 +222,8 @@ mod tests { manifest_version: 0, flushed_entry_id: 0, }, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, }], ..Default::default() } @@ -245,6 +249,8 @@ mod tests { manifest_version: 0, flushed_entry_id: 0, }, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, }], ..Default::default() } diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index 1196403a26..3daf6150f3 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -18,4 +18,5 @@ broker_endpoints = {kafka_wal_broker_endpoints | unescaped} num_topics = 3 selector_type = "round_robin" topic_name_prefix = "distributed_test_greptimedb_wal_topic" +auto_prune_topic_records = true {{ endif }}