From f73b61e7672348886863c162ff2df4c443826293 Mon Sep 17 00:00:00 2001 From: Yuhan Wang Date: Thu, 3 Apr 2025 16:11:51 +0800 Subject: [PATCH] feat(remote-wal): add remote wal prune procedure (#5714) * feat: add remote wal prune procedure * feat: add retry logic and remove rollback * chore: simplify the logic * fix: remove REMOTE_WAL_LOCK * fix: use in-memory kv * perf: O(n) judgement * chore: add single write lock * test: add unit test * chore: remove unused function * chore: update comments * chore: apply comments * chore: apply comments --- Cargo.lock | 2 + src/common/meta/src/key.rs | 4 + src/common/meta/src/lock_key.rs | 37 ++ src/common/meta/src/region_registry.rs | 15 + src/common/meta/src/wal_options_allocator.rs | 2 +- .../wal_options_allocator/topic_creator.rs | 12 +- src/log-store/src/kafka.rs | 1 + src/log-store/src/kafka/client_manager.rs | 2 +- src/meta-srv/Cargo.toml | 4 +- src/meta-srv/src/error.rs | 29 +- src/meta-srv/src/procedure.rs | 1 + src/meta-srv/src/procedure/test_util.rs | 104 +++++ src/meta-srv/src/procedure/wal_prune.rs | 434 ++++++++++++++++++ src/store-api/src/region_engine.rs | 4 +- 14 files changed, 643 insertions(+), 8 deletions(-) create mode 100644 src/meta-srv/src/procedure/wal_prune.rs diff --git a/Cargo.lock b/Cargo.lock index 54be2d472e..bc55fc6722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6720,12 +6720,14 @@ dependencies = [ "itertools 0.14.0", "lazy_static", "local-ip-address", + "log-store", "once_cell", "parking_lot 0.12.3", "prometheus", "prost 0.13.3", "rand 0.9.0", "regex", + "rskafka", "serde", "serde_json", "servers", diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 37aa3ec3af..ada1eb42a9 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -512,6 +512,10 @@ impl TableMetadataManager { &self.table_route_manager } + pub fn topic_region_manager(&self) -> &TopicRegionManager { + &self.topic_region_manager + } + #[cfg(feature = "testing")] pub fn kv_backend(&self) -> &KvBackendRef { &self.kv_backend diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index 4ae2696d83..8ed9cf48b5 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -27,6 +27,7 @@ const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; const FLOW_LOCK_PREFIX: &str = "__flow_lock"; +const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock"; /// [CatalogLock] acquires the lock on the tenant level. pub enum CatalogLock<'a> { @@ -231,6 +232,31 @@ impl From for StringKey { } } +/// [RemoteWalLock] acquires the lock on the remote wal topic level. +pub enum RemoteWalLock { + Read(String), + Write(String), +} + +impl Display for RemoteWalLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let key = match self { + RemoteWalLock::Read(s) => s, + RemoteWalLock::Write(s) => s, + }; + write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key) + } +} + +impl From for StringKey { + fn from(value: RemoteWalLock) -> Self { + match value { + RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()), + RemoteWalLock::Read(_) => StringKey::Share(value.to_string()), + } + } +} + #[cfg(test)] mod tests { use common_procedure::StringKey; @@ -308,5 +334,16 @@ mod tests { string_key, StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id)) ); + // The remote wal lock + let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into(); + assert_eq!( + string_key, + StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo")) + ); + let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo")) + ); } } diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index 267cf2d10b..76fb271f52 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -95,6 +95,21 @@ impl LeaderRegionManifestInfo { } => *data_flushed_entry_id, } } + + /// Returns the minimum flushed entry id of the leader region. + /// It is used to determine the minimum flushed entry id that can be pruned in remote wal. + pub fn min_flushed_entry_id(&self) -> u64 { + match self { + LeaderRegionManifestInfo::Mito { + flushed_entry_id, .. + } => *flushed_entry_id, + LeaderRegionManifestInfo::Metric { + data_flushed_entry_id, + metadata_flushed_entry_id, + .. + } => (*data_flushed_entry_id).min(*metadata_flushed_entry_id), + } + } } pub type LeaderRegionRegistryRef = Arc; diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index b4ac0fea3d..2aba2a5ee3 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -30,7 +30,7 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result}; use crate::key::NAME_PATTERN_REGEX; use crate::kv_backend::KvBackendRef; use crate::leadership_notifier::LeadershipChangeListener; -use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator; +pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator; use crate::wal_options_allocator::topic_pool::KafkaTopicPool; /// Allocates wal options in region granularity. diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index a9d655c737..71cdd83592 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_telemetry::{error, info}; use common_wal::config::kafka::MetasrvKafkaConfig; use rskafka::client::error::Error as RsKafkaError; @@ -32,9 +34,11 @@ use crate::error::{ // The `DEFAULT_PARTITION` refers to the index of the partition. const DEFAULT_PARTITION: i32 = 0; +type KafkaClientRef = Arc; + /// Creates topics in kafka. pub struct KafkaTopicCreator { - client: Client, + client: KafkaClientRef, /// The number of partitions per topic. num_partitions: i32, /// The replication factor of each topic. @@ -44,6 +48,10 @@ pub struct KafkaTopicCreator { } impl KafkaTopicCreator { + pub fn client(&self) -> &KafkaClientRef { + &self.client + } + async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> { let controller = client .controller_client() @@ -151,7 +159,7 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 4803664ae0..15830a4f05 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true mock = [] pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"] mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"] -testing = [] +testing = ["common-wal/testing"] [lints] workspace = true @@ -52,12 +52,14 @@ humantime-serde.workspace = true hyper-util = { workspace = true, features = ["tokio"] } itertools.workspace = true lazy_static.workspace = true +log-store.workspace = true once_cell.workspace = true parking_lot.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true regex.workspace = true +rskafka.workspace = true serde.workspace = true serde_json.workspace = true servers.workspace = true diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 6038bc6901..4e9fa6a26c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -787,6 +787,31 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display( + "Failed to build a Kafka partition client, topic: {}, partition: {}", + topic, + partition + ))] + BuildPartitionClient { + topic: String, + partition: i32, + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to delete record from Kafka"))] + DeleteRecord { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + topic: String, + partition: i32, + offset: u64, + }, } impl Error { @@ -834,7 +859,9 @@ impl ErrorExt for Error { | Error::ExceededDeadline { .. } | Error::ChooseItems { .. } | Error::FlowStateHandler { .. } - | Error::BuildWalOptionsAllocator { .. } => StatusCode::Internal, + | Error::BuildWalOptionsAllocator { .. } + | Error::BuildPartitionClient { .. } + | Error::DeleteRecord { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index a10e54918b..88869d8482 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -24,6 +24,7 @@ pub mod test_util; #[cfg(test)] mod tests; pub mod utils; +pub mod wal_prune; #[derive(Clone)] pub struct ProcedureManagerListenerAdapter(pub ProcedureManagerRef); diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 800caceb34..1b3616a8be 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -12,13 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage}; use common_meta::instruction::{ DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, }; +use common_meta::key::table_route::TableRouteValue; +use common_meta::key::test_utils::new_test_table_info; +use common_meta::key::TableMetadataManagerRef; +use common_meta::peer::Peer; +use common_meta::region_registry::{ + LeaderRegion, LeaderRegionManifestInfo, LeaderRegionRegistryRef, +}; +use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::Sequence; use common_time::util::current_time_millis; +use common_wal::options::{KafkaWalOptions, WalOptions}; +use store_api::logstore::EntryId; +use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use crate::error::Result; @@ -152,3 +165,94 @@ pub fn new_upgrade_region_reply( )), } } + +pub async fn new_wal_prune_metadata( + table_metadata_manager: TableMetadataManagerRef, + leader_region_registry: LeaderRegionRegistryRef, + n_region: u32, + n_table: u32, + offsets: &[i64], + threshold: u64, + topic: String, +) -> (EntryId, Vec) { + let datanode_id = 1; + let from_peer = Peer::empty(datanode_id); + let mut min_last_entry_id = 0; + let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize); + for table_id in 0..n_table { + let region_ids = (0..n_region) + .map(|i| RegionId::new(table_id, i)) + .collect::>(); + let table_info = new_test_table_info(table_id, 0..n_region).into(); + let region_routes = region_ids + .iter() + .map(|region_id| RegionRoute { + region: Region::new_test(*region_id), + leader_peer: Some(from_peer.clone()), + ..Default::default() + }) + .collect::>(); + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + }); + let wal_options = serde_json::to_string(&wal_options).unwrap(); + let region_wal_options: HashMap = (0..n_region) + .map(|region_number| (region_number, wal_options.clone())) + .collect(); + + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(region_routes), + region_wal_options, + ) + .await + .unwrap(); + + let current_region_entry_ids = region_ids + .iter() + .map(|region_id| { + let rand_n = rand::random::() as usize; + let current_last_entry_id = offsets[rand_n % offsets.len()] as u64; + min_last_entry_id = min_last_entry_id.min(current_last_entry_id); + (*region_id, current_last_entry_id) + }) + .collect::>(); + region_entry_ids.extend(current_region_entry_ids.clone()); + update_in_memory_region_last_entry_id(&leader_region_registry, current_region_entry_ids) + .await + .unwrap(); + } + + let regions_to_flush = region_entry_ids + .iter() + .filter_map(|(region_id, last_entry_id)| { + if last_entry_id - min_last_entry_id > threshold { + Some(*region_id) + } else { + None + } + }) + .collect::>(); + (min_last_entry_id, regions_to_flush) +} + +pub async fn update_in_memory_region_last_entry_id( + leader_region_registry: &LeaderRegionRegistryRef, + region_entry_ids: HashMap, +) -> Result<()> { + let mut key_values = Vec::with_capacity(region_entry_ids.len()); + for (region_id, flushed_entry_id) in region_entry_ids { + let value = LeaderRegion { + datanode_id: 1, + manifest: LeaderRegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id, + }, + }; + key_values.push((region_id, value)); + } + leader_region_registry.batch_put(key_values); + + Ok(()) +} diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs new file mode 100644 index 0000000000..b29a44a7fb --- /dev/null +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -0,0 +1,434 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_meta::key::TableMetadataManagerRef; +use common_meta::lock_key::RemoteWalLock; +use common_meta::region_registry::LeaderRegionRegistryRef; +use common_procedure::error::ToJsonSnafu; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, StringKey, +}; +use common_telemetry::warn; +use log_store::kafka::DEFAULT_PARTITION; +use rskafka::client::partition::UnknownTopicHandling; +use rskafka::client::Client; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::logstore::EntryId; +use store_api::storage::RegionId; + +use crate::error::{self, BuildPartitionClientSnafu, DeleteRecordSnafu, TableMetadataManagerSnafu}; +use crate::Result; + +type KafkaClientRef = Arc; + +const TIMEOUT: i32 = 1000; + +/// The state of WAL pruning. +#[derive(Debug, Serialize, Deserialize)] +pub enum WalPruneState { + Prepare, + Prune, +} + +pub struct Context { + /// The Kafka client. + client: KafkaClientRef, + /// The table metadata manager. + table_metadata_manager: TableMetadataManagerRef, + leader_region_registry: LeaderRegionRegistryRef, +} + +/// The data of WAL pruning. +#[derive(Serialize, Deserialize)] +pub struct WalPruneData { + /// The topic name to prune. + pub topic: String, + /// The minimum flush entry id for topic, which is used to prune the WAL. + /// If the topic has no region, the value is set to `None`. + pub min_flushed_entry_id: EntryId, + /// The state. + pub state: WalPruneState, +} + +/// The procedure to prune WAL. +pub struct WalPruneProcedure { + pub data: WalPruneData, + pub context: Context, +} + +impl WalPruneProcedure { + const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune"; + + pub fn new(topic: String, context: Context) -> Self { + Self { + data: WalPruneData { + topic, + min_flushed_entry_id: 0, + state: WalPruneState::Prepare, + }, + context, + } + } + + pub fn from_json(json: &str, context: Context) -> ProcedureResult { + let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?; + Ok(Self { data, context }) + } + + /// Calculate the last entry id to prune for each topic. + pub async fn on_prepare(&mut self) -> Result { + let region_ids = self + .context + .table_metadata_manager + .topic_region_manager() + .regions(&self.data.topic) + .await + .context(TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: "Failed to get topic-region map", + })?; + let flush_entry_ids_map: HashMap<_, _> = self + .context + .leader_region_registry + .batch_get(region_ids.iter().cloned()) + .into_iter() + .map(|(region_id, region)| { + let flushed_entry_id = region.manifest.min_flushed_entry_id(); + (region_id, flushed_entry_id) + }) + .collect(); + + if region_ids.is_empty() { + // No regions to prune. + return Ok(Status::done()); + } + // Check if the `flush_entry_ids_map` contains all region ids. + let non_collected_region_ids = + check_heartbeat_collected_region_ids(®ion_ids, &flush_entry_ids_map); + if !non_collected_region_ids.is_empty() { + // The heartbeat collected region ids do not contain all region ids in the topic-region map. + // In this case, we should not prune the WAL. + warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure. + topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids); + return Ok(Status::done()); + } + + // Safety: `flush_entry_ids_map` are not empty. + self.data.min_flushed_entry_id = *(flush_entry_ids_map.values().min().unwrap()); + self.data.state = WalPruneState::Prune; + Ok(Status::executing(true)) + } + + /// Prune the WAL. + pub async fn on_prune(&mut self) -> Result { + // Safety: last_entry_ids are loaded in on_prepare. + let partition_client = self + .context + .client + .partition_client( + self.data.topic.clone(), + DEFAULT_PARTITION, + UnknownTopicHandling::Retry, + ) + .await + .context(BuildPartitionClientSnafu { + topic: self.data.topic.clone(), + partition: DEFAULT_PARTITION, + })?; + + partition_client + .delete_records(self.data.min_flushed_entry_id as i64, TIMEOUT) + .await + .context(DeleteRecordSnafu { + topic: self.data.topic.clone(), + partition: DEFAULT_PARTITION, + offset: self.data.min_flushed_entry_id, + }) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: "Failed to delete records", + })?; + + // TODO(CookiePie): Persist the minimum flushed entry id to the table metadata manager. + Ok(Status::done()) + } +} + +#[async_trait::async_trait] +impl Procedure for WalPruneProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + fn rollback_supported(&self) -> bool { + false + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + match state { + WalPruneState::Prepare => self.on_prepare().await, + WalPruneState::Prune => self.on_prune().await, + } + .map_err(|e| { + if e.is_retryable() { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + }) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + /// WAL prune procedure will read the topic-region map from the table metadata manager, + /// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations. + /// But the modifications are atomic, so it does not conflict with the procedure. + /// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails. + fn lock_key(&self) -> LockKey { + let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into(); + LockKey::new(vec![lock_key]) + } +} + +/// Check if the heartbeat collected region ids contains all region ids in the topic-region map. +fn check_heartbeat_collected_region_ids( + region_ids: &[RegionId], + heartbeat_collected_region_ids: &HashMap, +) -> Vec { + let mut non_collected_region_ids = Vec::new(); + for region_id in region_ids { + if !heartbeat_collected_region_ids.contains_key(region_id) { + non_collected_region_ids.push(*region_id); + } + } + non_collected_region_ids +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::region_registry::LeaderRegionRegistry; + use common_meta::wal_options_allocator::build_kafka_topic_creator; + use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; + use common_wal::config::kafka::MetasrvKafkaConfig; + use common_wal::test_util::run_test_with_kafka_wal; + use rskafka::record::Record; + + use super::*; + use crate::procedure::test_util::new_wal_prune_metadata; + + struct TestEnv { + table_metadata_manager: TableMetadataManagerRef, + leader_region_registry: LeaderRegionRegistryRef, + } + + impl TestEnv { + fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let leader_region_registry = Arc::new(LeaderRegionRegistry::new()); + Self { + table_metadata_manager, + leader_region_registry, + } + } + + fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + + fn leader_region_registry(&self) -> &LeaderRegionRegistryRef { + &self.leader_region_registry + } + } + + /// Mock a test env for testing. + /// Including: + /// 1. Create a test env with a mailbox, a table metadata manager and a in-memory kv backend. + /// 2. Prepare some data in the table metadata manager and in-memory kv backend. + /// 3. Generate a `WalPruneProcedure` with the test env. + /// 4. Return the test env, the procedure, the minimum last entry id to prune and the regions to flush. + async fn mock_test_env( + topic: String, + broker_endpoints: Vec, + env: &TestEnv, + ) -> (WalPruneProcedure, u64, Vec) { + // Creates a topic manager. + let kafka_topic = KafkaTopicConfig { + replication_factor: broker_endpoints.len() as i16, + ..Default::default() + }; + let config = MetasrvKafkaConfig { + connection: KafkaConnectionConfig { + broker_endpoints, + ..Default::default() + }, + kafka_topic, + ..Default::default() + }; + let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); + let table_metadata_manager = env.table_metadata_manager().clone(); + let leader_region_registry = env.leader_region_registry().clone(); + let offsets = mock_wal_entries(topic_creator.client().clone(), &topic, 10).await; + + let (min_last_entry_id, regions_to_flush) = new_wal_prune_metadata( + table_metadata_manager.clone(), + leader_region_registry.clone(), + 10, + 5, + &offsets, + 10, + topic.clone(), + ) + .await; + + let context = Context { + client: topic_creator.client().clone(), + table_metadata_manager, + leader_region_registry, + }; + + let wal_prune_procedure = WalPruneProcedure::new(topic, context); + (wal_prune_procedure, min_last_entry_id, regions_to_flush) + } + + fn record(i: usize) -> Record { + let key = format!("key_{i}"); + let value = format!("value_{i}"); + Record { + key: Some(key.into()), + value: Some(value.into()), + timestamp: chrono::Utc::now(), + headers: Default::default(), + } + } + + async fn mock_wal_entries( + client: KafkaClientRef, + topic_name: &str, + n_entries: usize, + ) -> Vec { + let controller_client = client.controller_client().unwrap(); + let _ = controller_client + .create_topic(topic_name, 1, 1, 5_000) + .await; + let partition_client = client + .partition_client(topic_name, 0, UnknownTopicHandling::Retry) + .await + .unwrap(); + let mut offsets = Vec::with_capacity(n_entries); + for i in 0..n_entries { + let record = vec![record(i)]; + let offset = partition_client + .produce( + record, + rskafka::client::partition::Compression::NoCompression, + ) + .await + .unwrap(); + offsets.extend(offset); + } + offsets + } + + async fn check_entry_id_existence( + client: KafkaClientRef, + topic_name: &str, + entry_id: i64, + ) -> bool { + let partition_client = client + .partition_client(topic_name, 0, UnknownTopicHandling::Retry) + .await + .unwrap(); + let (records, _high_watermark) = partition_client + .fetch_records(entry_id, 0..10001, 5_000) + .await + .unwrap(); + !records.is_empty() + } + + async fn delete_topic(client: KafkaClientRef, topic_name: &str) { + let controller_client = client.controller_client().unwrap(); + controller_client + .delete_topic(topic_name, 5_000) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_procedure_execution() { + run_test_with_kafka_wal(|broker_endpoints| { + Box::pin(async { + common_telemetry::init_default_ut_logging(); + let topic_name = "greptime_test_topic".to_string(); + let env = TestEnv::new(); + let (mut procedure, min_last_entry_id, _) = + mock_test_env(topic_name.clone(), broker_endpoints, &env).await; + + // Step 1: Test `on_prepare`. + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + assert_matches!(procedure.data.state, WalPruneState::Prune); + assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id); + + // Step 2: Test `on_prune`. + let status = procedure.on_prune().await.unwrap(); + assert_matches!(status, Status::Done { output: None }); + // Check if the entry ids after `min_flushed_entry_id` still exist. + assert!( + check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.min_flushed_entry_id as i64, + ) + .await + ); + // Check if the entry s before `min_flushed_entry_id` are deleted. + assert!( + procedure.data.min_flushed_entry_id == 0 + || !check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.min_flushed_entry_id as i64 - 1, + ) + .await + ); + + // `check_heartbeat_collected_region_ids` fails. + // Should log a warning and return `Status::Done`. + procedure.context.leader_region_registry.reset(); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Done { output: None }); + + // Clean up the topic. + delete_topic(procedure.context.client, &topic_name).await; + }) + }) + .await; + } +} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 022ddb80df..62a310700c 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -409,9 +409,9 @@ impl RegionManifestInfo { flushed_entry_id, .. } => *flushed_entry_id, RegionManifestInfo::Metric { - metadata_flushed_entry_id, + data_flushed_entry_id, .. - } => *metadata_flushed_entry_id, + } => *data_flushed_entry_id, } }