feat: enable submitting wal prune procedure periodically (#5867)

* feat: enable submitting wal prune procedure periodically

* chore: fix and add options

* test: add unit test

* test: fix unit test

* test: enable active_wal_pruning in test

* test: update default config

* chore: update config name

* refactor: use semaphore to control the number of prune process

* refactor: use split client for wal prune manager and topic creator

* chore: add configs

* chore: apply review comments

* fix: use tracker properly

* fix: use guard to track semaphore

* test: update unit tests

* chore: update config name

* chore: use prunable_entry_id

* refactor: semaphore to only limit the process of submitting

* chore: remove legacy sort

* chore: better configs

* fix: update config.md

* chore: respect fmt

* test: update unit tests

* chore: use interval_at

* fix: fix unit test

* test: fix unit test

* test: fix unit test

* chore: apply review comments

* docs: update config docs
This commit is contained in:
Yuhan Wang
2025-04-19 00:02:33 +08:00
committed by GitHub
parent 41814bb49f
commit e817a65d75
26 changed files with 881 additions and 198 deletions

View File

@@ -2,13 +2,13 @@ meta:
configData: |-
[runtime]
global_rt_size = 4
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
num_topics = 3
auto_prune_topic_records = true
[datanode]
[datanode.client]
timeout = "120s"

View File

@@ -343,6 +343,9 @@
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.auto_prune_interval` | String | `0s` | Interval of automatically WAL pruning.<br/>Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically. |
| `wal.trigger_flush_threshold` | Integer | `0` | The threshold to trigger a flush operation of a region in automatically WAL pruning.<br/>Metasrv will send a flush request to flush the region when:<br/>`trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`<br/>where:<br/>- `prunable_entry_id` is the maximum entry id that can be pruned of the region.<br/>- `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.<br/>Set to `0` to disable the flush operation. |
| `wal.auto_prune_parallelism` | Integer | `10` | Concurrent task limit for automatically WAL pruning. |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>Only accepts strings that match the following regular expression pattern:<br/>[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |

View File

@@ -130,6 +130,22 @@ broker_endpoints = ["127.0.0.1:9092"]
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
auto_create_topics = true
## Interval of automatically WAL pruning.
## Set to `0s` to disable automatically WAL pruning which delete unused remote WAL entries periodically.
auto_prune_interval = "0s"
## The threshold to trigger a flush operation of a region in automatically WAL pruning.
## Metasrv will send a flush request to flush the region when:
## `trigger_flush_threshold` + `prunable_entry_id` < `max_prunable_entry_id`
## where:
## - `prunable_entry_id` is the maximum entry id that can be pruned of the region.
## - `max_prunable_entry_id` is the maximum prunable entry id among all regions in the same topic.
## Set to `0` to disable the flush operation.
trigger_flush_threshold = 0
## Concurrent task limit for automatically WAL pruning.
auto_prune_parallelism = 10
## Number of topics.
num_topics = 64

View File

@@ -779,6 +779,8 @@ impl InformationExtension for StandaloneInformationExtension {
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
}
})
.collect::<Vec<_>>();

View File

@@ -94,6 +94,13 @@ pub struct RegionStat {
pub index_size: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
/// The latest entry id of topic used by data.
/// **Only used by remote WAL prune.**
pub data_topic_latest_entry_id: u64,
/// The latest entry id of topic used by metadata.
/// **Only used by remote WAL prune.**
/// In mito engine, this is the same as `data_topic_latest_entry_id`.
pub metadata_topic_latest_entry_id: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
@@ -264,6 +271,8 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
}
}
}

View File

@@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use store_api::storage::RegionId;
use crate::datanode::RegionManifestInfo;
use crate::datanode::{RegionManifestInfo, RegionStat};
/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
@@ -35,25 +35,22 @@ pub enum LeaderRegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
topic_latest_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
data_topic_latest_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
metadata_topic_latest_entry_id: u64,
},
}
impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
fn from(value: RegionManifestInfo) -> Self {
match value {
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
impl LeaderRegionManifestInfo {
/// Generate a [LeaderRegionManifestInfo] from [RegionStat].
pub fn from_region_stat(region_stat: &RegionStat) -> LeaderRegionManifestInfo {
match region_stat.region_manifest {
RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
@@ -62,14 +59,22 @@ impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
} => LeaderRegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
},
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
},
}
}
}
impl LeaderRegionManifestInfo {
/// Returns the manifest version of the leader region.
pub fn manifest_version(&self) -> u64 {
match self {
@@ -96,17 +101,33 @@ impl LeaderRegionManifestInfo {
}
}
/// Returns the minimum flushed entry id of the leader region.
pub fn min_flushed_entry_id(&self) -> u64 {
/// Returns prunable entry id of the leader region.
/// It is used to determine the entry id that can be pruned in remote wal.
///
/// For a mito region, the prunable entry id should max(flushed_entry_id, latest_entry_id_since_flush).
///
/// For a metric region, the prunable entry id should min(
/// max(data_flushed_entry_id, data_latest_entry_id_since_flush),
/// max(metadata_flushed_entry_id, metadata_latest_entry_id_since_flush)
/// ).
pub fn prunable_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
data_topic_latest_entry_id,
metadata_flushed_entry_id,
metadata_topic_latest_entry_id,
..
} => (*data_flushed_entry_id).min(*metadata_flushed_entry_id),
} => {
let data_prunable_entry_id =
(*data_flushed_entry_id).max(*data_topic_latest_entry_id);
let metadata_prunable_entry_id =
(*metadata_flushed_entry_id).max(*metadata_topic_latest_entry_id);
data_prunable_entry_id.min(metadata_prunable_entry_id)
}
}
}
}

View File

@@ -30,7 +30,9 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
pub use crate::wal_options_allocator::topic_creator::{
build_kafka_client, build_kafka_topic_creator,
};
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{error, info};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::MetasrvKafkaConfig;
@@ -34,11 +32,9 @@ use crate::error::{
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
type KafkaClientRef = Arc<Client>;
/// Creates topics in kafka.
pub struct KafkaTopicCreator {
client: KafkaClientRef,
client: Client,
/// The number of partitions per topic.
num_partitions: i32,
/// The replication factor of each topic.
@@ -48,7 +44,7 @@ pub struct KafkaTopicCreator {
}
impl KafkaTopicCreator {
pub fn client(&self) -> &KafkaClientRef {
pub fn client(&self) -> &Client {
&self.client
}
@@ -133,7 +129,8 @@ impl KafkaTopicCreator {
}
}
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(config: &MetasrvKafkaConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
@@ -145,15 +142,19 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<Ka
if let Some(tls) = &config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
let client = builder
builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: config.connection.broker_endpoints.clone(),
})?;
})
}
/// Builds a [KafkaTopicCreator].
pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<KafkaTopicCreator> {
let client = build_kafka_client(config).await?;
Ok(KafkaTopicCreator {
client: Arc::new(client),
client,
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,

View File

@@ -15,6 +15,8 @@
pub mod kafka;
pub mod raft_engine;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
@@ -53,11 +55,32 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
auto_prune_interval: config.auto_prune_interval,
trigger_flush_threshold: config.trigger_flush_threshold,
auto_prune_parallelism: config.auto_prune_parallelism,
}),
}
}
}
impl MetasrvWalConfig {
/// Returns if active wal pruning is enabled.
pub fn enable_active_wal_pruning(&self) -> bool {
match self {
MetasrvWalConfig::RaftEngine => false,
MetasrvWalConfig::Kafka(config) => config.auto_prune_interval > Duration::ZERO,
}
}
/// Gets the kafka connection config.
pub fn remote_wal_options(&self) -> Option<&MetasrvKafkaConfig> {
match self {
MetasrvWalConfig::RaftEngine => None,
MetasrvWalConfig::Kafka(config) => Some(config),
}
}
}
impl From<MetasrvWalConfig> for DatanodeWalConfig {
fn from(config: MetasrvWalConfig) -> Self {
match config {
@@ -181,6 +204,9 @@ mod tests {
create_topic_timeout: Duration::from_secs(30),
},
auto_create_topics: true,
auto_prune_interval: Duration::from_secs(0),
trigger_flush_threshold: 0,
auto_prune_parallelism: 10,
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

View File

@@ -30,6 +30,13 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(120)),
};
/// Default interval for active WAL pruning.
pub const DEFAULT_ACTIVE_PRUNE_INTERVAL: Duration = Duration::ZERO;
/// Default limit for concurrent active pruning tasks.
pub const DEFAULT_ACTIVE_PRUNE_TASK_LIMIT: usize = 10;
/// Default interval for sending flush request to regions when pruning remote WAL.
pub const DEFAULT_TRIGGER_FLUSH_THRESHOLD: u64 = 0;
use crate::error::{self, Result};
use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};

View File

@@ -17,7 +17,10 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use crate::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL,
DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
};
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -44,6 +47,15 @@ pub struct DatanodeKafkaConfig {
pub dump_index_interval: Duration,
/// Ignore missing entries during read WAL.
pub overwrite_entry_start_id: bool,
// Active WAL pruning.
pub auto_prune_topic_records: bool,
// Interval of WAL pruning.
pub auto_prune_interval: Duration,
// Threshold for sending flush request when pruning remote WAL.
// `None` stands for never sending flush request.
pub trigger_flush_threshold: u64,
// Limit of concurrent active pruning procedures.
pub auto_prune_parallelism: usize,
}
impl Default for DatanodeKafkaConfig {
@@ -58,6 +70,10 @@ impl Default for DatanodeKafkaConfig {
create_index: true,
dump_index_interval: Duration::from_secs(60),
overwrite_entry_start_id: false,
auto_prune_topic_records: false,
auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL,
trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT,
}
}
}

View File

@@ -12,9 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use crate::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_ACTIVE_PRUNE_INTERVAL,
DEFAULT_ACTIVE_PRUNE_TASK_LIMIT, DEFAULT_TRIGGER_FLUSH_THRESHOLD,
};
/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -28,6 +33,13 @@ pub struct MetasrvKafkaConfig {
pub kafka_topic: KafkaTopicConfig,
// Automatically create topics for WAL.
pub auto_create_topics: bool,
// Interval of WAL pruning.
pub auto_prune_interval: Duration,
// Threshold for sending flush request when pruning remote WAL.
// `None` stands for never sending flush request.
pub trigger_flush_threshold: u64,
// Limit of concurrent active pruning procedures.
pub auto_prune_parallelism: usize,
}
impl Default for MetasrvKafkaConfig {
@@ -36,6 +48,9 @@ impl Default for MetasrvKafkaConfig {
connection: Default::default(),
kafka_topic: Default::default(),
auto_create_topics: true,
auto_prune_interval: DEFAULT_ACTIVE_PRUNE_INTERVAL,
trigger_flush_threshold: DEFAULT_TRIGGER_FLUSH_THRESHOLD,
auto_prune_parallelism: DEFAULT_ACTIVE_PRUNE_TASK_LIMIT,
}
}
}

View File

@@ -518,6 +518,13 @@ pub enum Error {
source: common_procedure::Error,
},
#[snafu(display("A prune task for topic {} is already running", topic))]
PruneTaskAlreadyRunning {
topic: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Schema already exists, name: {schema_name}"))]
SchemaAlreadyExists {
schema_name: String,
@@ -788,6 +795,14 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to build kafka client."))]
BuildKafkaClient {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: common_meta::error::Error,
},
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
@@ -875,7 +890,9 @@ impl ErrorExt for Error {
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildPartitionClient { .. }
| Error::DeleteRecords { .. } => StatusCode::Internal,
| Error::BuildKafkaClient { .. }
| Error::DeleteRecords { .. }
| Error::PruneTaskAlreadyRunning { .. } => StatusCode::Internal,
Error::Unsupported { .. } => StatusCode::Unsupported,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::region_registry::LeaderRegion;
use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
use store_api::region_engine::RegionRole;
use crate::error::Result;
@@ -44,7 +44,7 @@ impl HeartbeatHandler for CollectLeaderRegionHandler {
continue;
}
let manifest = stat.region_manifest.into();
let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
let value = LeaderRegion {
datanode_id: current_stat.id,
manifest,
@@ -122,6 +122,8 @@ mod tests {
manifest_size: 0,
sst_size: 0,
index_size: 0,
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}
}
@@ -161,6 +163,7 @@ mod tests {
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 1,
flushed_entry_id: 0,
topic_latest_entry_id: 0,
},
})
);
@@ -192,6 +195,7 @@ mod tests {
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 2,
flushed_entry_id: 0,
topic_latest_entry_id: 0,
},
})
);
@@ -224,6 +228,7 @@ mod tests {
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 2,
flushed_entry_id: 0,
topic_latest_entry_id: 0,
},
})
);

View File

@@ -102,6 +102,8 @@ mod tests {
manifest_version: 0,
flushed_entry_id: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}
}
acc.stat = Some(Stat {

View File

@@ -160,6 +160,8 @@ mod test {
manifest_version: 0,
flushed_entry_id: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}
}

View File

@@ -15,6 +15,7 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(extract_if)]
#![feature(hash_set_entry)]
pub mod bootstrap;
pub mod cache_invalidator;

View File

@@ -61,6 +61,7 @@ use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
use crate::lease::lookup_datanode_peer;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
@@ -407,6 +408,7 @@ pub struct Metasrv {
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
cache_invalidator: CacheInvalidatorRef,
leader_region_registry: LeaderRegionRegistryRef,
wal_prune_ticker: Option<WalPruneTickerRef>,
plugins: Plugins,
}
@@ -461,6 +463,9 @@ impl Metasrv {
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}
if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
}
if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
customizer.customize(&mut leadership_change_notifier);
}

View File

@@ -37,7 +37,7 @@ use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
@@ -58,6 +58,8 @@ use crate::metasrv::{
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
@@ -346,6 +348,40 @@ impl MetasrvBuilder {
.context(error::InitDdlManagerSnafu)?,
);
// remote WAL prune ticker and manager
let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
let (tx, rx) = WalPruneManager::channel();
// Safety: Must be remote WAL.
let remote_wal_options = options.wal.remote_wal_options().unwrap();
let kafka_client = build_kafka_client(remote_wal_options)
.await
.context(error::BuildKafkaClientSnafu)?;
let wal_prune_context = WalPruneContext {
client: Arc::new(kafka_client),
table_metadata_manager: table_metadata_manager.clone(),
leader_region_registry: leader_region_registry.clone(),
server_addr: options.server_addr.clone(),
mailbox: mailbox.clone(),
};
let wal_prune_manager = WalPruneManager::new(
table_metadata_manager.clone(),
remote_wal_options.auto_prune_parallelism,
rx,
procedure_manager.clone(),
wal_prune_context,
remote_wal_options.trigger_flush_threshold,
);
// Start manager in background. Ticker will be started in the main thread to send ticks.
wal_prune_manager.try_start().await?;
let wal_prune_ticker = Arc::new(WalPruneTicker::new(
remote_wal_options.auto_prune_interval,
tx.clone(),
));
Some(wal_prune_ticker)
} else {
None
};
let customized_region_lease_renewer = plugins
.as_ref()
.and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
@@ -406,6 +442,7 @@ impl MetasrvBuilder {
region_supervisor_ticker,
cache_invalidator,
leader_region_registry,
wal_prune_ticker,
})
}
}

View File

@@ -66,4 +66,7 @@ lazy_static! {
// The heartbeat rate counter.
pub static ref METRIC_META_HEARTBEAT_RATE: IntCounter =
register_int_counter!("greptime_meta_heartbeat_rate", "meta heartbeat arrival rate").unwrap();
/// The remote WAL prune execute counter.
pub static ref METRIC_META_REMOTE_WAL_PRUNE_EXECUTE: IntCounterVec =
register_int_counter_vec!("greptime_meta_remote_wal_prune_execute", "meta remote wal prune execute", &["topic_name"]).unwrap();
}

View File

@@ -179,8 +179,8 @@ pub async fn new_wal_prune_metadata(
) -> (EntryId, Vec<RegionId>) {
let datanode_id = 1;
let from_peer = Peer::empty(datanode_id);
let mut min_flushed_entry_id = u64::MAX;
let mut max_flushed_entry_id = 0;
let mut min_prunable_entry_id = u64::MAX;
let mut max_prunable_entry_id = 0;
let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize);
for table_id in 0..n_table {
let region_ids = (0..n_region)
@@ -221,10 +221,10 @@ pub async fn new_wal_prune_metadata(
.iter()
.map(|region_id| {
let rand_n = rand::random::<u64>() as usize;
let current_flushed_entry_id = offsets[rand_n % offsets.len()] as u64;
min_flushed_entry_id = min_flushed_entry_id.min(current_flushed_entry_id);
max_flushed_entry_id = max_flushed_entry_id.max(current_flushed_entry_id);
(*region_id, current_flushed_entry_id)
let current_prunable_entry_id = offsets[rand_n % offsets.len()] as u64;
min_prunable_entry_id = min_prunable_entry_id.min(current_prunable_entry_id);
max_prunable_entry_id = max_prunable_entry_id.max(current_prunable_entry_id);
(*region_id, current_prunable_entry_id)
})
.collect::<HashMap<_, _>>();
region_entry_ids.extend(current_region_entry_ids.clone());
@@ -235,15 +235,15 @@ pub async fn new_wal_prune_metadata(
let regions_to_flush = region_entry_ids
.iter()
.filter_map(|(region_id, flushed_entry_id)| {
if max_flushed_entry_id - flushed_entry_id > threshold {
.filter_map(|(region_id, prunable_entry_id)| {
if max_prunable_entry_id - prunable_entry_id > threshold {
Some(*region_id)
} else {
None
}
})
.collect::<Vec<_>>();
(min_flushed_entry_id, regions_to_flush)
(min_prunable_entry_id, regions_to_flush)
}
pub async fn update_in_memory_region_flushed_entry_id(
@@ -257,6 +257,7 @@ pub async fn update_in_memory_region_flushed_entry_id(
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id,
topic_latest_entry_id: 0,
},
};
key_values.push((region_id, value));

View File

@@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod manager;
#[cfg(test)]
mod test_util;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
@@ -28,9 +32,10 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey,
};
use common_telemetry::warn;
use common_telemetry::{info, warn};
use itertools::{Itertools, MinMaxResult};
use log_store::kafka::DEFAULT_PARTITION;
use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
@@ -45,9 +50,8 @@ use crate::error::{
use crate::service::mailbox::{Channel, MailboxRef};
use crate::Result;
type KafkaClientRef = Arc<Client>;
pub type KafkaClientRef = Arc<Client>;
/// No timeout for flush request.
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1);
/// The state of WAL pruning.
@@ -58,17 +62,18 @@ pub enum WalPruneState {
Prune,
}
#[derive(Clone)]
pub struct Context {
/// The Kafka client.
client: KafkaClientRef,
pub client: KafkaClientRef,
/// The table metadata manager.
table_metadata_manager: TableMetadataManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
/// The leader region registry.
leader_region_registry: LeaderRegionRegistryRef,
pub leader_region_registry: LeaderRegionRegistryRef,
/// Server address of metasrv.
server_addr: String,
pub server_addr: String,
/// The mailbox to send messages.
mailbox: MailboxRef,
pub mailbox: MailboxRef,
}
/// The data of WAL pruning.
@@ -77,10 +82,11 @@ pub struct WalPruneData {
/// The topic name to prune.
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
pub min_flushed_entry_id: EntryId,
pub prunable_entry_id: EntryId,
pub regions_to_flush: Vec<RegionId>,
/// If `flushed_entry_id` + `trigger_flush_threshold` < `max_flushed_entry_id`, send a flush request to the region.
pub trigger_flush_threshold: Option<u64>,
/// If `prunable_entry_id` + `trigger_flush_threshold` < `max_prunable_entry_id`, send a flush request to the region.
/// If `None`, never send flush requests.
pub trigger_flush_threshold: u64,
/// The state.
pub state: WalPruneState,
}
@@ -89,27 +95,43 @@ pub struct WalPruneData {
pub struct WalPruneProcedure {
pub data: WalPruneData,
pub context: Context,
pub _guard: Option<WalPruneProcedureGuard>,
}
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
pub fn new(topic: String, context: Context, trigger_flush_threshold: Option<u64>) -> Self {
pub fn new(
topic: String,
context: Context,
trigger_flush_threshold: u64,
guard: Option<WalPruneProcedureGuard>,
) -> Self {
Self {
data: WalPruneData {
topic,
min_flushed_entry_id: 0,
prunable_entry_id: 0,
trigger_flush_threshold,
regions_to_flush: vec![],
state: WalPruneState::Prepare,
},
context,
_guard: guard,
}
}
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
pub fn from_json(
json: &str,
context: &Context,
tracker: WalPruneProcedureTracker,
) -> ProcedureResult<Self> {
let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
Ok(Self { data, context })
let guard = tracker.insert_running_procedure(data.topic.clone());
Ok(Self {
data,
context: context.clone(),
_guard: guard,
})
}
async fn build_peer_to_region_ids_map(
@@ -182,20 +204,20 @@ impl WalPruneProcedure {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to get topic-region map",
})?;
let flush_entry_ids_map: HashMap<_, _> = self
let prunable_entry_ids_map: HashMap<_, _> = self
.context
.leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.map(|(region_id, region)| {
let flushed_entry_id = region.manifest.min_flushed_entry_id();
(region_id, flushed_entry_id)
let prunable_entry_id = region.manifest.prunable_entry_id();
(region_id, prunable_entry_id)
})
.collect();
// Check if the `flush_entry_ids_map` contains all region ids.
// Check if the `prunable_entry_ids_map` contains all region ids.
let non_collected_region_ids =
check_heartbeat_collected_region_ids(&region_ids, &flush_entry_ids_map);
check_heartbeat_collected_region_ids(&region_ids, &prunable_entry_ids_map);
if !non_collected_region_ids.is_empty() {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
@@ -204,23 +226,23 @@ impl WalPruneProcedure {
return Ok(Status::done());
}
let min_max_result = flush_entry_ids_map.values().minmax();
let max_flushed_entry_id = match min_max_result {
let min_max_result = prunable_entry_ids_map.values().minmax();
let max_prunable_entry_id = match min_max_result {
MinMaxResult::NoElements => {
return Ok(Status::done());
}
MinMaxResult::OneElement(flushed_entry_id) => {
self.data.min_flushed_entry_id = *flushed_entry_id;
*flushed_entry_id
MinMaxResult::OneElement(prunable_entry_id) => {
self.data.prunable_entry_id = *prunable_entry_id;
*prunable_entry_id
}
MinMaxResult::MinMax(min_flushed_entry_id, max_flushed_entry_id) => {
self.data.min_flushed_entry_id = *min_flushed_entry_id;
*max_flushed_entry_id
MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => {
self.data.prunable_entry_id = *min_prunable_entry_id;
*max_prunable_entry_id
}
};
if let Some(threshold) = self.data.trigger_flush_threshold {
for (region_id, flushed_entry_id) in flush_entry_ids_map {
if flushed_entry_id + threshold < max_flushed_entry_id {
if self.data.trigger_flush_threshold != 0 {
for (region_id, prunable_entry_id) in prunable_entry_ids_map {
if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id {
self.data.regions_to_flush.push(region_id);
}
}
@@ -232,10 +254,17 @@ impl WalPruneProcedure {
}
/// Send the flush request to regions with low flush entry id.
///
/// Retry:
/// - Failed to build peer to region ids map. It means failure in retrieving metadata.
pub async fn on_sending_flush_request(&mut self) -> Result<Status> {
let peer_to_region_ids_map = self
.build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
.await?;
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to build peer to region ids map",
})?;
let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
for (peer, flush_instruction) in flush_instructions.into_iter() {
let msg = MailboxMessage::json_message(
@@ -255,13 +284,13 @@ impl WalPruneProcedure {
Ok(Status::executing(true))
}
/// Prune the WAL and persist the minimum flushed entry id.
/// Prune the WAL and persist the minimum prunable entry id.
///
/// Retry:
/// - Failed to update the minimum flushed entry id in kvbackend.
/// - Failed to update the minimum prunable entry id in kvbackend.
/// - Failed to delete records.
pub async fn on_prune(&mut self) -> Result<Status> {
// Safety: flushed_entry_ids are loaded in on_prepare.
// Safety: `prunable_entry_id`` are loaded in on_prepare.
let partition_client = self
.context
.client
@@ -276,7 +305,7 @@ impl WalPruneProcedure {
partition: DEFAULT_PARTITION,
})?;
// Should update the min flushed entry id in the kv backend before deleting records.
// Should update the min prunable entry id in the kv backend before deleting records.
// Otherwise, when a datanode restarts, it will not be able to find the wal entries.
let prev = self
.context
@@ -292,7 +321,7 @@ impl WalPruneProcedure {
self.context
.table_metadata_manager
.topic_name_manager()
.update(&self.data.topic, self.data.min_flushed_entry_id, prev)
.update(&self.data.topic, self.data.prunable_entry_id, prev)
.await
.context(UpdateTopicNameValueSnafu {
topic: &self.data.topic,
@@ -306,14 +335,14 @@ impl WalPruneProcedure {
})?;
partition_client
.delete_records(
(self.data.min_flushed_entry_id + 1) as i64,
(self.data.prunable_entry_id + 1) as i64,
DELETE_RECORDS_TIMEOUT.as_millis() as i32,
)
.await
.context(DeleteRecordsSnafu {
topic: &self.data.topic,
partition: DEFAULT_PARTITION,
offset: (self.data.min_flushed_entry_id + 1),
offset: (self.data.prunable_entry_id + 1),
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
@@ -321,9 +350,13 @@ impl WalPruneProcedure {
"Failed to delete records for topic: {}, partition: {}, offset: {}",
self.data.topic,
DEFAULT_PARTITION,
self.data.min_flushed_entry_id + 1
self.data.prunable_entry_id + 1
),
})?;
info!(
"Successfully pruned WAL for topic: {}, entry id: {}",
self.data.topic, self.data.prunable_entry_id
);
Ok(Status::done())
}
}
@@ -388,123 +421,41 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::HeartbeatResponse;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_kafka_topic_creator;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use rskafka::record::Record;
use tokio::sync::mpsc::Receiver;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::test_util::{new_wal_prune_metadata, MailboxContext};
struct TestEnv {
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
mailbox: MailboxContext,
server_addr: String,
}
impl TestEnv {
fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
table_metadata_manager,
leader_region_registry,
mailbox: mailbox_ctx,
server_addr: "localhost".to_string(),
}
}
fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
fn leader_region_registry(&self) -> &LeaderRegionRegistryRef {
&self.leader_region_registry
}
fn mailbox_context(&self) -> &MailboxContext {
&self.mailbox
}
fn server_addr(&self) -> &str {
&self.server_addr
}
}
use crate::procedure::test_util::new_wal_prune_metadata;
// Fix this import to correctly point to the test_util module
use crate::procedure::wal_prune::test_util::TestEnv;
/// Mock a test env for testing.
/// Including:
/// 1. Prepare some data in the table metadata manager and in-memory kv backend.
/// 2. Generate a `WalPruneProcedure` with the test env.
/// 3. Return the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_env(
topic: String,
broker_endpoints: Vec<String>,
env: &TestEnv,
) -> (WalPruneProcedure, u64, Vec<RegionId>) {
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let table_metadata_manager = env.table_metadata_manager().clone();
let leader_region_registry = env.leader_region_registry().clone();
let mailbox = env.mailbox_context().mailbox().clone();
/// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec<RegionId>) {
let n_region = 10;
let n_table = 5;
let threshold = 10;
// 5 entries per region.
let offsets = mock_wal_entries(
topic_creator.client().clone(),
&topic,
procedure.context.client.clone(),
&procedure.data.topic,
(n_region * n_table * 5) as usize,
)
.await;
let (min_flushed_entry_id, regions_to_flush) = new_wal_prune_metadata(
table_metadata_manager.clone(),
leader_region_registry.clone(),
let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata(
procedure.context.table_metadata_manager.clone(),
procedure.context.leader_region_registry.clone(),
n_region,
n_table,
&offsets,
threshold,
topic.clone(),
procedure.data.trigger_flush_threshold,
procedure.data.topic.clone(),
)
.await;
let context = Context {
client: topic_creator.client().clone(),
table_metadata_manager,
leader_region_registry,
mailbox,
server_addr: env.server_addr().to_string(),
};
let wal_prune_procedure = WalPruneProcedure::new(topic, context, Some(threshold));
(wal_prune_procedure, min_flushed_entry_id, regions_to_flush)
(prunable_entry_id, regions_to_flush)
}
fn record(i: usize) -> Record {
@@ -603,10 +554,18 @@ mod tests {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
common_telemetry::init_default_ut_logging();
let topic_name = "greptime_test_topic".to_string();
let mut topic_name = uuid::Uuid::new_v4().to_string();
// Topic should start with a letter.
topic_name = format!("test_procedure_execution-{}", topic_name);
let mut env = TestEnv::new();
let (mut procedure, min_flushed_entry_id, regions_to_flush) =
mock_test_env(topic_name.clone(), broker_endpoints, &env).await;
let context = env.build_wal_prune_context(broker_endpoints).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
// Before any data in kvbackend is mocked, should return a retryable error.
let result = procedure.on_prune().await;
assert_matches!(result, Err(e) if e.is_retryable());
let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
@@ -618,7 +577,7 @@ mod tests {
}
);
assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
assert_eq!(procedure.data.min_flushed_entry_id, min_flushed_entry_id);
assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
assert_eq!(
procedure.data.regions_to_flush.len(),
regions_to_flush.len()
@@ -646,34 +605,31 @@ mod tests {
// Step 3: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after `min_flushed_entry_id` still exist.
// Check if the entry ids after `prunable_entry_id` still exist.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64 + 1,
procedure.data.prunable_entry_id as i64 + 1,
true,
)
.await;
// Check if the entry s before `min_flushed_entry_id` are deleted.
// Check if the entry s before `prunable_entry_id` are deleted.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64,
procedure.data.prunable_entry_id as i64,
false,
)
.await;
let min_entry_id = env
.table_metadata_manager()
let value = env
.table_metadata_manager
.topic_name_manager()
.get(&topic_name)
.await
.unwrap()
.unwrap();
assert_eq!(
min_entry_id.pruned_entry_id,
procedure.data.min_flushed_entry_id
);
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
@@ -682,13 +638,10 @@ mod tests {
assert_matches!(status, Status::Done { output: None });
// Step 5: Test `on_prepare`, don't flush regions.
procedure.data.trigger_flush_threshold = None;
procedure.data.trigger_flush_threshold = 0;
procedure.on_prepare().await.unwrap();
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(
min_entry_id.pruned_entry_id,
procedure.data.min_flushed_entry_id
);
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;

View File

@@ -0,0 +1,438 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_set::Entry;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Semaphore;
use tokio::time::{interval_at, Instant, MissedTickBehavior};
use crate::error::{self, Result};
use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
pub type WalPruneTickerRef = Arc<WalPruneTicker>;
/// Tracks running [WalPruneProcedure]s and the resources they hold.
/// A [WalPruneProcedure] is holding a semaphore permit to limit the number of concurrent procedures.
///
/// TODO(CookiePie): Similar to [RegionMigrationProcedureTracker], maybe can refactor to a unified framework.
#[derive(Clone)]
pub struct WalPruneProcedureTracker {
running_procedures: Arc<RwLock<HashSet<String>>>,
}
impl WalPruneProcedureTracker {
/// Insert a running [WalPruneProcedure] for the given topic name and
/// consume acquire a semaphore permit for the given topic name.
pub fn insert_running_procedure(&self, topic_name: String) -> Option<WalPruneProcedureGuard> {
let mut running_procedures = self.running_procedures.write().unwrap();
match running_procedures.entry(topic_name.clone()) {
Entry::Occupied(_) => None,
Entry::Vacant(entry) => {
entry.insert();
Some(WalPruneProcedureGuard {
topic_name,
running_procedures: self.running_procedures.clone(),
})
}
}
}
/// Number of running [WalPruneProcedure]s.
pub fn len(&self) -> usize {
self.running_procedures.read().unwrap().len()
}
}
/// [WalPruneProcedureGuard] is a guard for [WalPruneProcedure].
/// It is used to track the running [WalPruneProcedure]s.
/// When the guard is dropped, it will remove the topic name from the running procedures and release the semaphore.
pub struct WalPruneProcedureGuard {
topic_name: String,
running_procedures: Arc<RwLock<HashSet<String>>>,
}
impl Drop for WalPruneProcedureGuard {
fn drop(&mut self) {
let mut running_procedures = self.running_procedures.write().unwrap();
running_procedures.remove(&self.topic_name);
}
}
/// Event is used to notify the [WalPruneManager] to do some work.
///
/// - `Tick`: Trigger a submission of [WalPruneProcedure] to prune remote WAL.
pub enum Event {
Tick,
}
impl Debug for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Event::Tick => write!(f, "Tick"),
}
}
}
/// [WalPruneTicker] is a ticker that periodically sends [Event]s to the [WalPruneManager].
/// It is used to trigger the [WalPruneManager] to submit [WalPruneProcedure]s.
pub(crate) struct WalPruneTicker {
/// Handle of ticker thread.
pub(crate) tick_handle: Mutex<Option<JoinHandle<()>>>,
/// The interval of tick.
pub(crate) tick_interval: Duration,
/// Sends [Event]s.
pub(crate) sender: Sender<Event>,
}
#[async_trait::async_trait]
impl LeadershipChangeListener for WalPruneTicker {
fn name(&self) -> &'static str {
"WalPruneTicker"
}
async fn on_leader_start(&self) -> common_meta::error::Result<()> {
self.start();
Ok(())
}
async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
self.stop();
Ok(())
}
}
/// TODO(CookiePie): Similar to [RegionSupervisorTicker], maybe can refactor to a unified framework.
impl WalPruneTicker {
pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
Self {
tick_handle: Mutex::new(None),
tick_interval,
sender,
}
}
/// Starts the ticker.
pub fn start(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if handle.is_none() {
let sender = self.sender.clone();
let tick_interval = self.tick_interval;
let ticker_loop = tokio::spawn(async move {
let mut interval = interval_at(Instant::now() + tick_interval, tick_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
if sender.send(Event::Tick).await.is_err() {
info!("EventReceiver is dropped, tick loop is stopped");
break;
}
}
});
*handle = Some(ticker_loop);
}
info!("WalPruneTicker started.");
}
/// Stops the ticker.
pub fn stop(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if let Some(handle) = handle.take() {
handle.abort();
}
info!("WalPruneTicker stopped.");
}
}
impl Drop for WalPruneTicker {
fn drop(&mut self) {
self.stop();
}
}
/// [WalPruneManager] manages all remote WAL related tasks in metasrv.
///
/// [WalPruneManager] is responsible for:
/// 1. Registering [WalPruneProcedure] loader in the procedure manager.
/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL.
/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s.
pub(crate) struct WalPruneManager {
/// Table metadata manager to restore topics from kvbackend.
table_metadata_manager: TableMetadataManagerRef,
/// Receives [Event]s.
receiver: Receiver<Event>,
/// Procedure manager.
procedure_manager: ProcedureManagerRef,
/// Tracker for running [WalPruneProcedure]s.
tracker: WalPruneProcedureTracker,
/// Semaphore to limit the number of concurrent [WalPruneProcedure]s.
semaphore: Arc<Semaphore>,
/// Context for [WalPruneProcedure].
wal_prune_context: WalPruneContext,
/// Trigger flush threshold for [WalPruneProcedure].
/// If `None`, never send flush requests.
trigger_flush_threshold: u64,
}
impl WalPruneManager {
/// Returns a new empty [WalPruneManager].
pub fn new(
table_metadata_manager: TableMetadataManagerRef,
limit: usize,
receiver: Receiver<Event>,
procedure_manager: ProcedureManagerRef,
wal_prune_context: WalPruneContext,
trigger_flush_threshold: u64,
) -> Self {
Self {
table_metadata_manager,
receiver,
procedure_manager,
wal_prune_context,
tracker: WalPruneProcedureTracker {
running_procedures: Arc::new(RwLock::new(HashSet::new())),
},
semaphore: Arc::new(Semaphore::new(limit)),
trigger_flush_threshold,
}
}
/// Start the [WalPruneManager]. It will register [WalPruneProcedure] loader in the procedure manager.
pub async fn try_start(mut self) -> Result<()> {
let context = self.wal_prune_context.clone();
let tracker = self.tracker.clone();
self.procedure_manager
.register_loader(
WalPruneProcedure::TYPE_NAME,
Box::new(move |json| {
let tracker = tracker.clone();
WalPruneProcedure::from_json(json, &context, tracker).map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: WalPruneProcedure::TYPE_NAME,
})?;
common_runtime::spawn_global(async move {
self.run().await;
});
info!("WalPruneProcedureManager Started.");
Ok(())
}
/// Returns a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
tokio::sync::mpsc::channel(1024)
}
/// Runs the main loop. Performs actions on received events.
///
/// - `Tick`: Submit `limit` [WalPruneProcedure]s to prune remote WAL.
pub(crate) async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
match event {
Event::Tick => self.handle_tick_request().await.unwrap_or_else(|e| {
error!(e; "Failed to handle tick request");
}),
}
}
}
/// Submits a [WalPruneProcedure] for the given topic name.
pub async fn submit_procedure(&self, topic_name: &str) -> Result<ProcedureId> {
let guard = self
.tracker
.insert_running_procedure(topic_name.to_string())
.with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
let procedure = WalPruneProcedure::new(
topic_name.to_string(),
self.wal_prune_context.clone(),
self.trigger_flush_threshold,
Some(guard),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
METRIC_META_REMOTE_WAL_PRUNE_EXECUTE
.with_label_values(&[topic_name])
.inc();
let procedure_manager = self.procedure_manager.clone();
let mut watcher = procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu)?;
Ok(procedure_id)
}
async fn handle_tick_request(&self) -> Result<()> {
let topics = self.retrieve_sorted_topics().await?;
let mut tasks = Vec::with_capacity(topics.len());
for topic_name in topics.iter() {
tasks.push(async {
let _permit = self.semaphore.acquire().await.unwrap();
match self.submit_procedure(topic_name).await {
Ok(_) => {}
Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
warn!("Prune task for topic {} is already running", topic);
}
Err(e) => {
error!(
"Failed to submit prune task for topic {}: {}",
topic_name.clone(),
e
);
}
}
});
}
join_all(tasks).await;
Ok(())
}
/// Retrieve topics from the table metadata manager.
/// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics.
/// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time.
async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
self.table_metadata_manager
.topic_name_manager()
.range()
.await
.context(error::TableMetadataManagerSnafu)
}
}
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use common_meta::key::topic_name::TopicNameKey;
use common_wal::test_util::run_test_with_kafka_wal;
use tokio::time::{sleep, timeout};
use super::*;
use crate::procedure::wal_prune::test_util::TestEnv;
#[tokio::test]
async fn test_wal_prune_ticker() {
let (tx, mut rx) = WalPruneManager::channel();
let interval = Duration::from_millis(10);
let ticker = WalPruneTicker::new(interval, tx);
assert_eq!(ticker.name(), "WalPruneTicker");
for _ in 0..2 {
ticker.start();
sleep(2 * interval).await;
assert!(!rx.is_empty());
while let Ok(event) = rx.try_recv() {
assert_matches!(event, Event::Tick);
}
}
ticker.stop();
}
#[tokio::test]
async fn test_wal_prune_tracker_and_guard() {
let tracker = WalPruneProcedureTracker {
running_procedures: Arc::new(RwLock::new(HashSet::new())),
};
let topic_name = uuid::Uuid::new_v4().to_string();
{
let guard = tracker
.insert_running_procedure(topic_name.clone())
.unwrap();
assert_eq!(guard.topic_name, topic_name);
assert_eq!(guard.running_procedures.read().unwrap().len(), 1);
let result = tracker.insert_running_procedure(topic_name.clone());
assert!(result.is_none());
}
assert_eq!(tracker.running_procedures.read().unwrap().len(), 0);
}
async fn mock_wal_prune_manager(
broker_endpoints: Vec<String>,
limit: usize,
) -> (Sender<Event>, WalPruneManager) {
let test_env = TestEnv::new();
let (tx, rx) = WalPruneManager::channel();
let wal_prune_context = test_env.build_wal_prune_context(broker_endpoints).await;
(
tx,
WalPruneManager::new(
test_env.table_metadata_manager.clone(),
limit,
rx,
test_env.procedure_manager.clone(),
wal_prune_context,
0,
),
)
}
async fn mock_topics(manager: &WalPruneManager, topics: &[String]) {
let topic_name_keys = topics
.iter()
.map(|topic| TopicNameKey::new(topic))
.collect::<Vec<_>>();
manager
.table_metadata_manager
.topic_name_manager()
.batch_put(topic_name_keys)
.await
.unwrap();
}
#[tokio::test]
async fn test_wal_prune_manager() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
let limit = 6;
let (tx, manager) = mock_wal_prune_manager(broker_endpoints, limit).await;
let topics = (0..limit * 2)
.map(|_| uuid::Uuid::new_v4().to_string())
.collect::<Vec<_>>();
mock_topics(&manager, &topics).await;
let tracker = manager.tracker.clone();
let handler =
common_runtime::spawn_global(async move { manager.try_start().await.unwrap() });
handler.await.unwrap();
tx.send(Event::Tick).await.unwrap();
// Wait for at least one procedure to be submitted.
timeout(Duration::from_millis(100), async move { tracker.len() > 0 })
.await
.unwrap();
})
})
.await;
}
}

View File

@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::build_kafka_client;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::ProcedureManagerRef;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::Client;
use crate::procedure::test_util::MailboxContext;
use crate::procedure::wal_prune::Context as WalPruneContext;
pub struct TestEnv {
pub table_metadata_manager: TableMetadataManagerRef,
pub leader_region_registry: LeaderRegionRegistryRef,
pub procedure_manager: ProcedureManagerRef,
pub mailbox: MailboxContext,
pub server_addr: String,
}
impl TestEnv {
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let procedure_manager = Arc::new(LocalManager::new(
ManagerConfig::default(),
state_store,
poison_manager,
));
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
table_metadata_manager,
leader_region_registry,
procedure_manager,
mailbox: mailbox_ctx,
server_addr: "localhost".to_string(),
}
}
async fn build_kafka_client(broker_endpoints: Vec<String>) -> Arc<Client> {
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
Arc::new(build_kafka_client(&config).await.unwrap())
}
pub async fn build_wal_prune_context(&self, broker_endpoints: Vec<String>) -> WalPruneContext {
let client = Self::build_kafka_client(broker_endpoints).await;
WalPruneContext {
client,
table_metadata_manager: self.table_metadata_manager.clone(),
leader_region_registry: self.leader_region_registry.clone(),
server_addr: self.server_addr.to_string(),
mailbox: self.mailbox.mailbox().clone(),
}
}
}

View File

@@ -195,6 +195,8 @@ mod tests {
manifest_version: 0,
flushed_entry_id: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}],
..Default::default()
}
@@ -220,6 +222,8 @@ mod tests {
manifest_version: 0,
flushed_entry_id: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}],
..Default::default()
}
@@ -245,6 +249,8 @@ mod tests {
manifest_version: 0,
flushed_entry_id: 0,
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
}],
..Default::default()
}

View File

@@ -18,4 +18,5 @@ broker_endpoints = {kafka_wal_broker_endpoints | unescaped}
num_topics = 3
selector_type = "round_robin"
topic_name_prefix = "distributed_test_greptimedb_wal_topic"
auto_prune_topic_records = true
{{ endif }}