From 6692957e08ac96b7e22f3a997ad9b4d1e81efd4d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 21 Aug 2025 19:09:31 +0800 Subject: [PATCH] refactor: simplify WAL Pruning procedure part2 (#6782) refactor: simplify prune wal procedure Signed-off-by: WenyXu --- src/meta-srv/src/metasrv/builder.rs | 1 - src/meta-srv/src/procedure/test_util.rs | 1 + src/meta-srv/src/procedure/wal_prune.rs | 266 +++++------------- .../src/procedure/wal_prune/manager.rs | 103 ++++++- src/meta-srv/src/procedure/wal_prune/utils.rs | 254 +++++++++++++++++ 5 files changed, 407 insertions(+), 218 deletions(-) create mode 100644 src/meta-srv/src/procedure/wal_prune/utils.rs diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 769dbd883f..efb7a5ad4d 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -439,7 +439,6 @@ impl MetasrvBuilder { leader_region_registry: leader_region_registry.clone(), }; let wal_prune_manager = WalPruneManager::new( - table_metadata_manager.clone(), remote_wal_options.auto_prune_parallelism, rx, procedure_manager.clone(), diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 17475b4b41..ab89808770 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -186,6 +186,7 @@ pub fn new_upgrade_region_reply( } } +/// Mock the test data for WAL pruning. pub async fn new_wal_prune_metadata( table_metadata_manager: TableMetadataManagerRef, leader_region_registry: LeaderRegionRegistryRef, diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index 37fede668e..ca68fd0610 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -15,10 +15,9 @@ pub(crate) mod manager; #[cfg(test)] mod test_util; +pub(crate) mod utils; -use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use common_error::ext::BoxedError; use common_meta::key::TableMetadataManagerRef; @@ -30,33 +29,20 @@ use common_procedure::{ Result as ProcedureResult, Status, StringKey, }; use common_telemetry::{info, warn}; -use itertools::{Itertools, MinMaxResult}; -use log_store::kafka::DEFAULT_PARTITION; use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker}; -use rskafka::client::partition::{OffsetAt, UnknownTopicHandling}; use rskafka::client::Client; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::logstore::EntryId; -use store_api::storage::RegionId; -use crate::error::{ - self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu, - UpdateTopicNameValueSnafu, +use crate::error::{self}; +use crate::procedure::wal_prune::utils::{ + delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id, }; use crate::Result; pub type KafkaClientRef = Arc; -const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5); - -/// The state of WAL pruning. -#[derive(Debug, Serialize, Deserialize)] -pub enum WalPruneState { - Prepare, - Prune, -} - #[derive(Clone)] pub struct Context { /// The Kafka client. @@ -74,8 +60,6 @@ pub struct WalPruneData { pub topic: String, /// The minimum flush entry id for topic, which is used to prune the WAL. pub prunable_entry_id: EntryId, - /// The state. - pub state: WalPruneState, } /// The procedure to prune WAL. @@ -88,12 +72,16 @@ pub struct WalPruneProcedure { impl WalPruneProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune"; - pub fn new(topic: String, context: Context, guard: Option) -> Self { + pub fn new( + context: Context, + guard: Option, + topic: String, + prunable_entry_id: u64, + ) -> Self { Self { data: WalPruneData { topic, - prunable_entry_id: 0, - state: WalPruneState::Prepare, + prunable_entry_id, }, context, _guard: guard, @@ -114,150 +102,59 @@ impl WalPruneProcedure { }) } - /// Prepare the entry id to prune and regions to flush. - /// - /// Retry: - /// - Failed to retrieve any metadata. - pub async fn on_prepare(&mut self) -> Result { - let region_ids = self - .context - .table_metadata_manager - .topic_region_manager() - .regions(&self.data.topic) - .await - .context(TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: "Failed to get topic-region map", - })?; - let prunable_entry_ids_map: HashMap<_, _> = self - .context - .leader_region_registry - .batch_get(region_ids.iter().cloned()) - .into_iter() - .map(|(region_id, region)| { - let prunable_entry_id = region.manifest.prunable_entry_id(); - (region_id, prunable_entry_id) - }) - .collect(); - - // Check if the `prunable_entry_ids_map` contains all region ids. - let non_collected_region_ids = - check_heartbeat_collected_region_ids(®ion_ids, &prunable_entry_ids_map); - if !non_collected_region_ids.is_empty() { - // The heartbeat collected region ids do not contain all region ids in the topic-region map. - // In this case, we should not prune the WAL. - warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure. - topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids); - return Ok(Status::done()); - } - - let min_max_result = prunable_entry_ids_map.values().minmax(); - match min_max_result { - MinMaxResult::NoElements => { - return Ok(Status::done()); - } - MinMaxResult::OneElement(prunable_entry_id) => { - self.data.prunable_entry_id = *prunable_entry_id; - } - MinMaxResult::MinMax(min_prunable_entry_id, _) => { - self.data.prunable_entry_id = *min_prunable_entry_id; - } - }; - self.data.state = WalPruneState::Prune; - Ok(Status::executing(false)) - } - /// Prune the WAL and persist the minimum prunable entry id. /// /// Retry: /// - Failed to update the minimum prunable entry id in kvbackend. /// - Failed to delete records. pub async fn on_prune(&mut self) -> Result { - let partition_client = self - .context - .client - .partition_client( - self.data.topic.clone(), - DEFAULT_PARTITION, - UnknownTopicHandling::Retry, - ) - .await - .context(BuildPartitionClientSnafu { - topic: self.data.topic.clone(), - partition: DEFAULT_PARTITION, - })?; - let earliest_offset = partition_client - .get_offset(OffsetAt::Earliest) - .await - .context(error::GetOffsetSnafu { - topic: self.data.topic.clone(), - })?; - let latest_offset = partition_client - .get_offset(OffsetAt::Latest) - .await - .context(error::GetOffsetSnafu { - topic: self.data.topic.clone(), - })?; - if self.data.prunable_entry_id <= earliest_offset as u64 { + let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?; + let (earliest_offset, latest_offset) = + get_offsets_for_topic(&partition_client, &self.data.topic).await?; + if self.data.prunable_entry_id <= earliest_offset { warn!( - "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}", - self.data.topic, self.data.prunable_entry_id, earliest_offset as u64 + "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}", + self.data.topic, + self.data.prunable_entry_id, + earliest_offset, + latest_offset ); return Ok(Status::done()); } - // Should update the min prunable entry id in the kv backend before deleting records. - // Otherwise, when a datanode restarts, it will not be able to find the wal entries. - let prev = self - .context - .table_metadata_manager - .topic_name_manager() - .get(&self.data.topic) - .await - .context(TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get TopicNameValue, topic: {}", self.data.topic), - })?; - self.context - .table_metadata_manager - .topic_name_manager() - .update(&self.data.topic, self.data.prunable_entry_id, prev) - .await - .context(UpdateTopicNameValueSnafu { - topic: &self.data.topic, - }) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!( - "Failed to update pruned entry id for topic: {}", - self.data.topic - ), - })?; - partition_client - .delete_records( - // notice here no "+1" is needed because the offset arg is exclusive, and it's defensive programming just in case somewhere else have a off by one error, see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) which we use to get the end offset from high watermark - self.data.prunable_entry_id as i64, - DELETE_RECORDS_TIMEOUT.as_millis() as i32, - ) - .await - .context(DeleteRecordsSnafu { - topic: &self.data.topic, - partition: DEFAULT_PARTITION, - offset: self.data.prunable_entry_id, - }) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!( - "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}", - self.data.topic, self.data.prunable_entry_id, latest_offset as u64 - ), - })?; + // Delete records. + delete_records( + &partition_client, + &self.data.topic, + self.data.prunable_entry_id, + ) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}", + self.data.topic, self.data.prunable_entry_id, latest_offset + ), + })?; + + // Update the pruned entry id for the topic. + update_pruned_entry_id( + &self.context.table_metadata_manager, + &self.data.topic, + self.data.prunable_entry_id, + ) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to update pruned entry id for topic: {}", + self.data.topic + ), + })?; info!( "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}", - self.data.topic, self.data.prunable_entry_id, latest_offset as u64 + self.data.topic, self.data.prunable_entry_id, latest_offset ); Ok(Status::done()) } @@ -274,13 +171,7 @@ impl Procedure for WalPruneProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let state = &self.data.state; - - match state { - WalPruneState::Prepare => self.on_prepare().await, - WalPruneState::Prune => self.on_prune().await, - } - .map_err(|e| { + self.on_prune().await.map_err(|e| { if e.is_retryable() { ProcedureError::retry_later(e) } else { @@ -303,26 +194,13 @@ impl Procedure for WalPruneProcedure { } } -/// Check if the heartbeat collected region ids contains all region ids in the topic-region map. -fn check_heartbeat_collected_region_ids( - region_ids: &[RegionId], - heartbeat_collected_region_ids: &HashMap, -) -> Vec { - let mut non_collected_region_ids = Vec::new(); - for region_id in region_ids { - if !heartbeat_collected_region_ids.contains_key(region_id) { - non_collected_region_ids.push(*region_id); - } - } - non_collected_region_ids -} - #[cfg(test)] mod tests { use std::assert_matches::assert_matches; use common_wal::maybe_skip_kafka_integration_test; use common_wal::test_util::get_kafka_endpoints; + use rskafka::client::partition::UnknownTopicHandling; use rskafka::record::Record; use super::*; @@ -334,23 +212,23 @@ mod tests { /// Including: /// 1. Prepare some data in the table metadata manager and in-memory kv backend. /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush. - async fn mock_test_data(procedure: &WalPruneProcedure) -> u64 { + async fn mock_test_data(context: Context, topic: &str) -> u64 { let n_region = 10; let n_table = 5; // 5 entries per region. let offsets = mock_wal_entries( - procedure.context.client.clone(), - &procedure.data.topic, + context.client.clone(), + topic, (n_region * n_table * 5) as usize, ) .await; let prunable_entry_id = new_wal_prune_metadata( - procedure.context.table_metadata_manager.clone(), - procedure.context.leader_region_registry.clone(), + context.table_metadata_manager.clone(), + context.leader_region_registry.clone(), n_region, n_table, &offsets, - procedure.data.topic.clone(), + topic.to_string(), ) .await; prunable_entry_id @@ -439,24 +317,13 @@ mod tests { topic_name = format!("test_procedure_execution-{}", topic_name); let env = TestEnv::new(); let context = env.build_wal_prune_context(broker_endpoints).await; + // Prepare the topic. TestEnv::prepare_topic(&context.client, &topic_name).await; - let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, None); - let prunable_entry_id = mock_test_data(&procedure).await; - - // Step 1: Test `on_prepare`. - let status = procedure.on_prepare().await.unwrap(); - assert_matches!( - status, - Status::Executing { - persist: false, - clean_poisons: false - } - ); - assert_matches!(procedure.data.state, WalPruneState::Prune); - assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id); - - // Step 2: Test `on_prune`. + // Mock the test data. + let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await; + let mut procedure = + WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id); let status = procedure.on_prune().await.unwrap(); assert_matches!(status, Status::Done { output: None }); // Check if the entry ids after(include) `prunable_entry_id` still exist. @@ -484,13 +351,6 @@ mod tests { .unwrap() .unwrap(); assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id); - - // Step 3: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails. - // Should log a warning and return `Status::Done`. - procedure.context.leader_region_registry.reset(); - let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Done { output: None }); - // Clean up the topic. delete_topic(procedure.context.client, &topic_name).await; } diff --git a/src/meta-srv/src/procedure/wal_prune/manager.rs b/src/meta-srv/src/procedure/wal_prune/manager.rs index 3ad29d29be..00fde36a85 100644 --- a/src/meta-srv/src/procedure/wal_prune/manager.rs +++ b/src/meta-srv/src/procedure/wal_prune/manager.rs @@ -17,9 +17,8 @@ use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::sync::{Arc, RwLock}; -use common_meta::key::TableMetadataManagerRef; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use futures::future::join_all; use snafu::{OptionExt, ResultExt}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -28,6 +27,7 @@ use tokio::sync::Semaphore; use crate::define_ticker; use crate::error::{self, Result}; use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE; +use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune}; use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure}; pub type WalPruneTickerRef = Arc; @@ -109,8 +109,6 @@ define_ticker!( /// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL. /// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s. pub(crate) struct WalPruneManager { - /// Table metadata manager to restore topics from kvbackend. - table_metadata_manager: TableMetadataManagerRef, /// Receives [Event]s. receiver: Receiver, /// Procedure manager. @@ -125,23 +123,21 @@ pub(crate) struct WalPruneManager { } impl WalPruneManager { - /// Returns a new empty [WalPruneManager]. + /// Returns a new empty [`WalPruneManager`]. pub fn new( - table_metadata_manager: TableMetadataManagerRef, - limit: usize, + parallelism: usize, receiver: Receiver, procedure_manager: ProcedureManagerRef, wal_prune_context: WalPruneContext, ) -> Self { Self { - table_metadata_manager, receiver, procedure_manager, wal_prune_context, tracker: WalPruneProcedureTracker { running_procedures: Arc::new(RwLock::new(HashSet::new())), }, - semaphore: Arc::new(Semaphore::new(limit)), + semaphore: Arc::new(Semaphore::new(parallelism)), } } @@ -186,16 +182,21 @@ impl WalPruneManager { } /// Submits a [WalPruneProcedure] for the given topic name. - pub async fn submit_procedure(&self, topic_name: &str) -> Result { + pub async fn wait_procedure( + &self, + topic_name: &str, + prunable_entry_id: u64, + ) -> Result { let guard = self .tracker .insert_running_procedure(topic_name.to_string()) .with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?; let procedure = WalPruneProcedure::new( - topic_name.to_string(), self.wal_prune_context.clone(), Some(guard), + topic_name.to_string(), + prunable_entry_id, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_id = procedure_with_id.id; @@ -214,13 +215,48 @@ impl WalPruneManager { Ok(procedure_id) } + async fn try_prune(&self, topic_name: &str) -> Result<()> { + let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone(); + let leader_region_registry = self.wal_prune_context.leader_region_registry.clone(); + let prunable_entry_id = find_pruneable_entry_id_for_topic( + &table_metadata_manager, + &leader_region_registry, + topic_name, + ) + .await?; + let Some(prunable_entry_id) = prunable_entry_id else { + debug!( + "No prunable entry id found for topic {}, skipping prune", + topic_name + ); + return Ok(()); + }; + let current = table_metadata_manager + .topic_name_manager() + .get(topic_name) + .await + .context(error::TableMetadataManagerSnafu)? + .map(|v| v.into_inner().pruned_entry_id); + if !should_trigger_prune(current, prunable_entry_id) { + debug!( + "No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}", + topic_name, current, prunable_entry_id + ); + return Ok(()); + } + + self.wait_procedure(topic_name, prunable_entry_id) + .await + .map(|_| ()) + } + async fn handle_tick_request(&self) -> Result<()> { let topics = self.retrieve_sorted_topics().await?; let mut tasks = Vec::with_capacity(topics.len()); for topic_name in topics.iter() { tasks.push(async { let _permit = self.semaphore.acquire().await.unwrap(); - match self.submit_procedure(topic_name).await { + match self.try_prune(topic_name).await { Ok(_) => {} Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => { warn!("Prune task for topic {} is already running", topic); @@ -244,7 +280,8 @@ impl WalPruneManager { /// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics. /// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time. async fn retrieve_sorted_topics(&self) -> Result> { - self.table_metadata_manager + self.wal_prune_context + .table_metadata_manager .topic_name_manager() .range() .await @@ -264,6 +301,7 @@ mod test { use tokio::time::{sleep, timeout}; use super::*; + use crate::procedure::test_util::new_wal_prune_metadata; use crate::procedure::wal_prune::test_util::TestEnv; #[tokio::test] @@ -315,7 +353,6 @@ mod test { ( tx, WalPruneManager::new( - test_env.table_metadata_manager.clone(), limit, rx, test_env.procedure_manager.clone(), @@ -330,6 +367,7 @@ mod test { .map(|topic| TopicNameKey::new(topic)) .collect::>(); manager + .wal_prune_context .table_metadata_manager .topic_name_manager() .batch_put(topic_name_keys) @@ -359,4 +397,41 @@ mod test { .await .unwrap(); } + + #[tokio::test] + async fn test_find_pruneable_entry_id_for_topic_none() { + let test_env = TestEnv::new(); + let prunable_entry_id = find_pruneable_entry_id_for_topic( + &test_env.table_metadata_manager, + &test_env.leader_region_registry, + "test_topic", + ) + .await + .unwrap(); + assert!(prunable_entry_id.is_none()); + } + + #[tokio::test] + async fn test_find_pruneable_entry_id_for_topic_some() { + let test_env = TestEnv::new(); + let topic = "test_topic"; + let expected_prunable_entry_id = new_wal_prune_metadata( + test_env.table_metadata_manager.clone(), + test_env.leader_region_registry.clone(), + 2, + 5, + &[3, 10, 23, 50, 52, 82, 130], + topic.to_string(), + ) + .await; + let prunable_entry_id = find_pruneable_entry_id_for_topic( + &test_env.table_metadata_manager, + &test_env.leader_region_registry, + topic, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(prunable_entry_id, expected_prunable_entry_id); + } } diff --git a/src/meta-srv/src/procedure/wal_prune/utils.rs b/src/meta-srv/src/procedure/wal_prune/utils.rs new file mode 100644 index 0000000000..d71f2867dd --- /dev/null +++ b/src/meta-srv/src/procedure/wal_prune/utils.rs @@ -0,0 +1,254 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use common_meta::key::TableMetadataManagerRef; +use common_meta::region_registry::LeaderRegionRegistryRef; +use common_telemetry::warn; +use itertools::{Itertools, MinMaxResult}; +use rskafka::client::partition::{OffsetAt, PartitionClient, UnknownTopicHandling}; +use rskafka::client::Client; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use crate::error::{ + BuildPartitionClientSnafu, DeleteRecordsSnafu, GetOffsetSnafu, Result, + TableMetadataManagerSnafu, UpdateTopicNameValueSnafu, +}; + +/// The default timeout for deleting records. +const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5); +/// The default partition. +const DEFAULT_PARTITION: i32 = 0; + +fn missing_region_ids( + all_region_ids: &[RegionId], + result_set: &HashMap, +) -> Vec { + let mut missing_region_ids = Vec::new(); + for region_id in all_region_ids { + if !result_set.contains_key(region_id) { + missing_region_ids.push(*region_id); + } + } + missing_region_ids +} + +/// Finds the prunable entry id for the topic. +/// +/// Returns `None` if: +/// - The topic has no region. +/// - Some region info is missing from heartbeat. +pub(crate) async fn find_pruneable_entry_id_for_topic( + table_metadata_manager: &TableMetadataManagerRef, + leader_region_registry: &LeaderRegionRegistryRef, + topic: &str, +) -> Result> { + let region_ids = table_metadata_manager + .topic_region_manager() + .regions(topic) + .await + .context(TableMetadataManagerSnafu)?; + if region_ids.is_empty() { + return Ok(None); + } + + // Get the prunable entry id for each region. + let prunable_entry_ids_map = leader_region_registry + .batch_get(region_ids.iter().cloned()) + .into_iter() + .map(|(region_id, region)| { + let prunable_entry_id = region.manifest.prunable_entry_id(); + (region_id, prunable_entry_id) + }) + .collect(); + let missing_region_ids = missing_region_ids(®ion_ids, &prunable_entry_ids_map); + if !missing_region_ids.is_empty() { + warn!( + "Cannot determine prunable entry id: missing region info from heartbeat. Topic: {}, missing region ids: {:?}", + topic, missing_region_ids + ); + return Ok(None); + } + + let min_max_result = prunable_entry_ids_map.values().minmax(); + match min_max_result { + MinMaxResult::NoElements => Ok(None), + MinMaxResult::OneElement(prunable_entry_id) => Ok(Some(*prunable_entry_id)), + MinMaxResult::MinMax(min_prunable_entry_id, _) => Ok(Some(*min_prunable_entry_id)), + } +} + +/// Determines whether pruning should be triggered based on the current pruned entry id and the prunable entry id. +/// Returns true if: +/// - There is no current pruned entry id (i.e., pruning has never occurred). +/// - The current pruned entry id is greater than the prunable entry id (i.e., there is something to prune). +pub(crate) fn should_trigger_prune(current: Option, prunable_entry_id: u64) -> bool { + match current { + None => true, // No pruning has occurred yet, should trigger immediately. + Some(current) => prunable_entry_id > current, + } +} + +/// Returns a partition client for the given topic. +pub(crate) async fn get_partition_client( + client: &Arc, + topic: &str, +) -> Result { + client + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + }) +} + +/// Returns the earliest and latest offsets for the given topic. +pub(crate) async fn get_offsets_for_topic( + partition_client: &PartitionClient, + topic: &str, +) -> Result<(u64, u64)> { + let earliest_offset = partition_client + .get_offset(OffsetAt::Earliest) + .await + .context(GetOffsetSnafu { topic })?; + let latest_offset = partition_client + .get_offset(OffsetAt::Latest) + .await + .context(GetOffsetSnafu { topic })?; + + Ok((earliest_offset as u64, latest_offset as u64)) +} + +/// Updates the pruned entry id for the given topic. +pub(crate) async fn update_pruned_entry_id( + table_metadata_manager: &TableMetadataManagerRef, + topic: &str, + pruned_entry_id: u64, +) -> Result<()> { + let prev = table_metadata_manager + .topic_name_manager() + .get(topic) + .await + .context(TableMetadataManagerSnafu)?; + + table_metadata_manager + .topic_name_manager() + .update(topic, pruned_entry_id, prev) + .await + .context(UpdateTopicNameValueSnafu { topic })?; + + Ok(()) +} + +/// Deletes the records for the given topic. +pub(crate) async fn delete_records( + partition_client: &PartitionClient, + topic: &str, + pruned_entry_id: u64, +) -> Result<()> { + partition_client + .delete_records( + // Note: here no "+1" is needed because the offset arg is exclusive, + // and it's defensive programming just in case somewhere else have a off by one error, + // see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) + // which we use to get the end offset from high watermark + pruned_entry_id as i64, + DELETE_RECORDS_TIMEOUT.as_millis() as i32, + ) + .await + .context(DeleteRecordsSnafu { + topic, + partition: DEFAULT_PARTITION, + offset: pruned_entry_id, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_should_trigger_prune_none_current() { + // No pruning has occurred yet, should trigger + assert!(should_trigger_prune(None, 10)); + assert!(should_trigger_prune(None, 0)); + } + + #[test] + fn test_should_trigger_prune_prunable_greater_than_current() { + // Prunable entry id is greater than current, should trigger + assert!(should_trigger_prune(Some(5), 6)); + assert!(should_trigger_prune(Some(0), 1)); + assert!(should_trigger_prune(Some(99), 100)); + } + + #[test] + fn test_should_not_trigger_prune_prunable_equal_to_current() { + // Prunable entry id is equal to current, should not trigger + assert!(!should_trigger_prune(Some(10), 10)); + assert!(!should_trigger_prune(Some(0), 0)); + } + + #[test] + fn test_should_not_trigger_prune_prunable_less_than_current() { + // Prunable entry id is less than current, should not trigger + assert!(!should_trigger_prune(Some(10), 9)); + assert!(!should_trigger_prune(Some(100), 99)); + } + + #[test] + fn test_missing_region_ids_none_missing() { + let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)]; + let mut result_set = HashMap::new(); + result_set.insert(RegionId::new(1, 1), 10); + result_set.insert(RegionId::new(2, 2), 20); + let missing = missing_region_ids(&all_region_ids, &result_set); + assert!(missing.is_empty()); + } + + #[test] + fn test_missing_region_ids_some_missing() { + let all_region_ids = vec![ + RegionId::new(1, 1), + RegionId::new(2, 2), + RegionId::new(3, 3), + ]; + let mut result_set = HashMap::new(); + result_set.insert(RegionId::new(1, 1), 10); + let missing = missing_region_ids(&all_region_ids, &result_set); + assert_eq!(missing, vec![RegionId::new(2, 2), RegionId::new(3, 3)]); + } + + #[test] + fn test_missing_region_ids_all_missing() { + let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)]; + let result_set = HashMap::new(); + let missing = missing_region_ids(&all_region_ids, &result_set); + assert_eq!(missing, all_region_ids); + } + + #[test] + fn test_missing_region_ids_empty_all() { + let all_region_ids: Vec = vec![]; + let mut result_set = HashMap::new(); + result_set.insert(RegionId::new(1, 1), 10); + let missing = missing_region_ids(&all_region_ids, &result_set); + assert!(missing.is_empty()); + } +}