refactor: simplify WAL Pruning procedure part2 (#6782)

refactor: simplify prune wal procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-21 19:09:31 +08:00
committed by GitHub
parent 896d72191e
commit 6692957e08
5 changed files with 407 additions and 218 deletions

View File

@@ -439,7 +439,6 @@ impl MetasrvBuilder {
leader_region_registry: leader_region_registry.clone(),
};
let wal_prune_manager = WalPruneManager::new(
table_metadata_manager.clone(),
remote_wal_options.auto_prune_parallelism,
rx,
procedure_manager.clone(),

View File

@@ -186,6 +186,7 @@ pub fn new_upgrade_region_reply(
}
}
/// Mock the test data for WAL pruning.
pub async fn new_wal_prune_metadata(
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,

View File

@@ -15,10 +15,9 @@
pub(crate) mod manager;
#[cfg(test)]
mod test_util;
pub(crate) mod utils;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
@@ -30,33 +29,20 @@ use common_procedure::{
Result as ProcedureResult, Status, StringKey,
};
use common_telemetry::{info, warn};
use itertools::{Itertools, MinMaxResult};
use log_store::kafka::DEFAULT_PARTITION;
use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
use rskafka::client::partition::{OffsetAt, UnknownTopicHandling};
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use crate::error::{
self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu,
UpdateTopicNameValueSnafu,
use crate::error::{self};
use crate::procedure::wal_prune::utils::{
delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
};
use crate::Result;
pub type KafkaClientRef = Arc<Client>;
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
/// The state of WAL pruning.
#[derive(Debug, Serialize, Deserialize)]
pub enum WalPruneState {
Prepare,
Prune,
}
#[derive(Clone)]
pub struct Context {
/// The Kafka client.
@@ -74,8 +60,6 @@ pub struct WalPruneData {
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
pub prunable_entry_id: EntryId,
/// The state.
pub state: WalPruneState,
}
/// The procedure to prune WAL.
@@ -88,12 +72,16 @@ pub struct WalPruneProcedure {
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
pub fn new(topic: String, context: Context, guard: Option<WalPruneProcedureGuard>) -> Self {
pub fn new(
context: Context,
guard: Option<WalPruneProcedureGuard>,
topic: String,
prunable_entry_id: u64,
) -> Self {
Self {
data: WalPruneData {
topic,
prunable_entry_id: 0,
state: WalPruneState::Prepare,
prunable_entry_id,
},
context,
_guard: guard,
@@ -114,150 +102,59 @@ impl WalPruneProcedure {
})
}
/// Prepare the entry id to prune and regions to flush.
///
/// Retry:
/// - Failed to retrieve any metadata.
pub async fn on_prepare(&mut self) -> Result<Status> {
let region_ids = self
.context
.table_metadata_manager
.topic_region_manager()
.regions(&self.data.topic)
.await
.context(TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to get topic-region map",
})?;
let prunable_entry_ids_map: HashMap<_, _> = self
.context
.leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.map(|(region_id, region)| {
let prunable_entry_id = region.manifest.prunable_entry_id();
(region_id, prunable_entry_id)
})
.collect();
// Check if the `prunable_entry_ids_map` contains all region ids.
let non_collected_region_ids =
check_heartbeat_collected_region_ids(&region_ids, &prunable_entry_ids_map);
if !non_collected_region_ids.is_empty() {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
return Ok(Status::done());
}
let min_max_result = prunable_entry_ids_map.values().minmax();
match min_max_result {
MinMaxResult::NoElements => {
return Ok(Status::done());
}
MinMaxResult::OneElement(prunable_entry_id) => {
self.data.prunable_entry_id = *prunable_entry_id;
}
MinMaxResult::MinMax(min_prunable_entry_id, _) => {
self.data.prunable_entry_id = *min_prunable_entry_id;
}
};
self.data.state = WalPruneState::Prune;
Ok(Status::executing(false))
}
/// Prune the WAL and persist the minimum prunable entry id.
///
/// Retry:
/// - Failed to update the minimum prunable entry id in kvbackend.
/// - Failed to delete records.
pub async fn on_prune(&mut self) -> Result<Status> {
let partition_client = self
.context
.client
.partition_client(
self.data.topic.clone(),
DEFAULT_PARTITION,
UnknownTopicHandling::Retry,
)
.await
.context(BuildPartitionClientSnafu {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
})?;
let earliest_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.context(error::GetOffsetSnafu {
topic: self.data.topic.clone(),
})?;
let latest_offset = partition_client
.get_offset(OffsetAt::Latest)
.await
.context(error::GetOffsetSnafu {
topic: self.data.topic.clone(),
})?;
if self.data.prunable_entry_id <= earliest_offset as u64 {
let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
let (earliest_offset, latest_offset) =
get_offsets_for_topic(&partition_client, &self.data.topic).await?;
if self.data.prunable_entry_id <= earliest_offset {
warn!(
"The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}",
self.data.topic, self.data.prunable_entry_id, earliest_offset as u64
"The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
self.data.topic,
self.data.prunable_entry_id,
earliest_offset,
latest_offset
);
return Ok(Status::done());
}
// Should update the min prunable entry id in the kv backend before deleting records.
// Otherwise, when a datanode restarts, it will not be able to find the wal entries.
let prev = self
.context
.table_metadata_manager
.topic_name_manager()
.get(&self.data.topic)
.await
.context(TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TopicNameValue, topic: {}", self.data.topic),
})?;
self.context
.table_metadata_manager
.topic_name_manager()
.update(&self.data.topic, self.data.prunable_entry_id, prev)
.await
.context(UpdateTopicNameValueSnafu {
topic: &self.data.topic,
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update pruned entry id for topic: {}",
self.data.topic
),
})?;
partition_client
.delete_records(
// notice here no "+1" is needed because the offset arg is exclusive, and it's defensive programming just in case somewhere else have a off by one error, see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) which we use to get the end offset from high watermark
self.data.prunable_entry_id as i64,
DELETE_RECORDS_TIMEOUT.as_millis() as i32,
)
.await
.context(DeleteRecordsSnafu {
topic: &self.data.topic,
partition: DEFAULT_PARTITION,
offset: self.data.prunable_entry_id,
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
self.data.topic, self.data.prunable_entry_id, latest_offset as u64
),
})?;
// Delete records.
delete_records(
&partition_client,
&self.data.topic,
self.data.prunable_entry_id,
)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
self.data.topic, self.data.prunable_entry_id, latest_offset
),
})?;
// Update the pruned entry id for the topic.
update_pruned_entry_id(
&self.context.table_metadata_manager,
&self.data.topic,
self.data.prunable_entry_id,
)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update pruned entry id for topic: {}",
self.data.topic
),
})?;
info!(
"Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
self.data.topic, self.data.prunable_entry_id, latest_offset as u64
self.data.topic, self.data.prunable_entry_id, latest_offset
);
Ok(Status::done())
}
@@ -274,13 +171,7 @@ impl Procedure for WalPruneProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
match state {
WalPruneState::Prepare => self.on_prepare().await,
WalPruneState::Prune => self.on_prune().await,
}
.map_err(|e| {
self.on_prune().await.map_err(|e| {
if e.is_retryable() {
ProcedureError::retry_later(e)
} else {
@@ -303,26 +194,13 @@ impl Procedure for WalPruneProcedure {
}
}
/// Check if the heartbeat collected region ids contains all region ids in the topic-region map.
fn check_heartbeat_collected_region_ids(
region_ids: &[RegionId],
heartbeat_collected_region_ids: &HashMap<RegionId, u64>,
) -> Vec<RegionId> {
let mut non_collected_region_ids = Vec::new();
for region_id in region_ids {
if !heartbeat_collected_region_ids.contains_key(region_id) {
non_collected_region_ids.push(*region_id);
}
}
non_collected_region_ids
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::record::Record;
use super::*;
@@ -334,23 +212,23 @@ mod tests {
/// Including:
/// 1. Prepare some data in the table metadata manager and in-memory kv backend.
/// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_data(procedure: &WalPruneProcedure) -> u64 {
async fn mock_test_data(context: Context, topic: &str) -> u64 {
let n_region = 10;
let n_table = 5;
// 5 entries per region.
let offsets = mock_wal_entries(
procedure.context.client.clone(),
&procedure.data.topic,
context.client.clone(),
topic,
(n_region * n_table * 5) as usize,
)
.await;
let prunable_entry_id = new_wal_prune_metadata(
procedure.context.table_metadata_manager.clone(),
procedure.context.leader_region_registry.clone(),
context.table_metadata_manager.clone(),
context.leader_region_registry.clone(),
n_region,
n_table,
&offsets,
procedure.data.topic.clone(),
topic.to_string(),
)
.await;
prunable_entry_id
@@ -439,24 +317,13 @@ mod tests {
topic_name = format!("test_procedure_execution-{}", topic_name);
let env = TestEnv::new();
let context = env.build_wal_prune_context(broker_endpoints).await;
// Prepare the topic.
TestEnv::prepare_topic(&context.client, &topic_name).await;
let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, None);
let prunable_entry_id = mock_test_data(&procedure).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
Status::Executing {
persist: false,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
// Step 2: Test `on_prune`.
// Mock the test data.
let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
let mut procedure =
WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after(include) `prunable_entry_id` still exist.
@@ -484,13 +351,6 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
// Step 3: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
}

View File

@@ -17,9 +17,8 @@ use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, RwLock};
use common_meta::key::TableMetadataManagerRef;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info, warn};
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc::{Receiver, Sender};
@@ -28,6 +27,7 @@ use tokio::sync::Semaphore;
use crate::define_ticker;
use crate::error::{self, Result};
use crate::metrics::METRIC_META_REMOTE_WAL_PRUNE_EXECUTE;
use crate::procedure::wal_prune::utils::{find_pruneable_entry_id_for_topic, should_trigger_prune};
use crate::procedure::wal_prune::{Context as WalPruneContext, WalPruneProcedure};
pub type WalPruneTickerRef = Arc<WalPruneTicker>;
@@ -109,8 +109,6 @@ define_ticker!(
/// 2. Periodically receive [Event::Tick] to submit [WalPruneProcedure] to prune remote WAL.
/// 3. Use a semaphore to limit the number of concurrent [WalPruneProcedure]s.
pub(crate) struct WalPruneManager {
/// Table metadata manager to restore topics from kvbackend.
table_metadata_manager: TableMetadataManagerRef,
/// Receives [Event]s.
receiver: Receiver<Event>,
/// Procedure manager.
@@ -125,23 +123,21 @@ pub(crate) struct WalPruneManager {
}
impl WalPruneManager {
/// Returns a new empty [WalPruneManager].
/// Returns a new empty [`WalPruneManager`].
pub fn new(
table_metadata_manager: TableMetadataManagerRef,
limit: usize,
parallelism: usize,
receiver: Receiver<Event>,
procedure_manager: ProcedureManagerRef,
wal_prune_context: WalPruneContext,
) -> Self {
Self {
table_metadata_manager,
receiver,
procedure_manager,
wal_prune_context,
tracker: WalPruneProcedureTracker {
running_procedures: Arc::new(RwLock::new(HashSet::new())),
},
semaphore: Arc::new(Semaphore::new(limit)),
semaphore: Arc::new(Semaphore::new(parallelism)),
}
}
@@ -186,16 +182,21 @@ impl WalPruneManager {
}
/// Submits a [WalPruneProcedure] for the given topic name.
pub async fn submit_procedure(&self, topic_name: &str) -> Result<ProcedureId> {
pub async fn wait_procedure(
&self,
topic_name: &str,
prunable_entry_id: u64,
) -> Result<ProcedureId> {
let guard = self
.tracker
.insert_running_procedure(topic_name.to_string())
.with_context(|| error::PruneTaskAlreadyRunningSnafu { topic: topic_name })?;
let procedure = WalPruneProcedure::new(
topic_name.to_string(),
self.wal_prune_context.clone(),
Some(guard),
topic_name.to_string(),
prunable_entry_id,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
@@ -214,13 +215,48 @@ impl WalPruneManager {
Ok(procedure_id)
}
async fn try_prune(&self, topic_name: &str) -> Result<()> {
let table_metadata_manager = self.wal_prune_context.table_metadata_manager.clone();
let leader_region_registry = self.wal_prune_context.leader_region_registry.clone();
let prunable_entry_id = find_pruneable_entry_id_for_topic(
&table_metadata_manager,
&leader_region_registry,
topic_name,
)
.await?;
let Some(prunable_entry_id) = prunable_entry_id else {
debug!(
"No prunable entry id found for topic {}, skipping prune",
topic_name
);
return Ok(());
};
let current = table_metadata_manager
.topic_name_manager()
.get(topic_name)
.await
.context(error::TableMetadataManagerSnafu)?
.map(|v| v.into_inner().pruned_entry_id);
if !should_trigger_prune(current, prunable_entry_id) {
debug!(
"No need to prune topic {}, current pruned entry id: {:?}, prunable entry id: {}",
topic_name, current, prunable_entry_id
);
return Ok(());
}
self.wait_procedure(topic_name, prunable_entry_id)
.await
.map(|_| ())
}
async fn handle_tick_request(&self) -> Result<()> {
let topics = self.retrieve_sorted_topics().await?;
let mut tasks = Vec::with_capacity(topics.len());
for topic_name in topics.iter() {
tasks.push(async {
let _permit = self.semaphore.acquire().await.unwrap();
match self.submit_procedure(topic_name).await {
match self.try_prune(topic_name).await {
Ok(_) => {}
Err(error::Error::PruneTaskAlreadyRunning { topic, .. }) => {
warn!("Prune task for topic {} is already running", topic);
@@ -244,7 +280,8 @@ impl WalPruneManager {
/// Since [WalPruneManager] submits procedures depending on the order of the topics, we should sort the topics.
/// TODO(CookiePie): Can register topics in memory instead of retrieving from the table metadata manager every time.
async fn retrieve_sorted_topics(&self) -> Result<Vec<String>> {
self.table_metadata_manager
self.wal_prune_context
.table_metadata_manager
.topic_name_manager()
.range()
.await
@@ -264,6 +301,7 @@ mod test {
use tokio::time::{sleep, timeout};
use super::*;
use crate::procedure::test_util::new_wal_prune_metadata;
use crate::procedure::wal_prune::test_util::TestEnv;
#[tokio::test]
@@ -315,7 +353,6 @@ mod test {
(
tx,
WalPruneManager::new(
test_env.table_metadata_manager.clone(),
limit,
rx,
test_env.procedure_manager.clone(),
@@ -330,6 +367,7 @@ mod test {
.map(|topic| TopicNameKey::new(topic))
.collect::<Vec<_>>();
manager
.wal_prune_context
.table_metadata_manager
.topic_name_manager()
.batch_put(topic_name_keys)
@@ -359,4 +397,41 @@ mod test {
.await
.unwrap();
}
#[tokio::test]
async fn test_find_pruneable_entry_id_for_topic_none() {
let test_env = TestEnv::new();
let prunable_entry_id = find_pruneable_entry_id_for_topic(
&test_env.table_metadata_manager,
&test_env.leader_region_registry,
"test_topic",
)
.await
.unwrap();
assert!(prunable_entry_id.is_none());
}
#[tokio::test]
async fn test_find_pruneable_entry_id_for_topic_some() {
let test_env = TestEnv::new();
let topic = "test_topic";
let expected_prunable_entry_id = new_wal_prune_metadata(
test_env.table_metadata_manager.clone(),
test_env.leader_region_registry.clone(),
2,
5,
&[3, 10, 23, 50, 52, 82, 130],
topic.to_string(),
)
.await;
let prunable_entry_id = find_pruneable_entry_id_for_topic(
&test_env.table_metadata_manager,
&test_env.leader_region_registry,
topic,
)
.await
.unwrap()
.unwrap();
assert_eq!(prunable_entry_id, expected_prunable_entry_id);
}
}

View File

@@ -0,0 +1,254 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_telemetry::warn;
use itertools::{Itertools, MinMaxResult};
use rskafka::client::partition::{OffsetAt, PartitionClient, UnknownTopicHandling};
use rskafka::client::Client;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::{
BuildPartitionClientSnafu, DeleteRecordsSnafu, GetOffsetSnafu, Result,
TableMetadataManagerSnafu, UpdateTopicNameValueSnafu,
};
/// The default timeout for deleting records.
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
/// The default partition.
const DEFAULT_PARTITION: i32 = 0;
fn missing_region_ids(
all_region_ids: &[RegionId],
result_set: &HashMap<RegionId, u64>,
) -> Vec<RegionId> {
let mut missing_region_ids = Vec::new();
for region_id in all_region_ids {
if !result_set.contains_key(region_id) {
missing_region_ids.push(*region_id);
}
}
missing_region_ids
}
/// Finds the prunable entry id for the topic.
///
/// Returns `None` if:
/// - The topic has no region.
/// - Some region info is missing from heartbeat.
pub(crate) async fn find_pruneable_entry_id_for_topic(
table_metadata_manager: &TableMetadataManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
topic: &str,
) -> Result<Option<u64>> {
let region_ids = table_metadata_manager
.topic_region_manager()
.regions(topic)
.await
.context(TableMetadataManagerSnafu)?;
if region_ids.is_empty() {
return Ok(None);
}
// Get the prunable entry id for each region.
let prunable_entry_ids_map = leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.map(|(region_id, region)| {
let prunable_entry_id = region.manifest.prunable_entry_id();
(region_id, prunable_entry_id)
})
.collect();
let missing_region_ids = missing_region_ids(&region_ids, &prunable_entry_ids_map);
if !missing_region_ids.is_empty() {
warn!(
"Cannot determine prunable entry id: missing region info from heartbeat. Topic: {}, missing region ids: {:?}",
topic, missing_region_ids
);
return Ok(None);
}
let min_max_result = prunable_entry_ids_map.values().minmax();
match min_max_result {
MinMaxResult::NoElements => Ok(None),
MinMaxResult::OneElement(prunable_entry_id) => Ok(Some(*prunable_entry_id)),
MinMaxResult::MinMax(min_prunable_entry_id, _) => Ok(Some(*min_prunable_entry_id)),
}
}
/// Determines whether pruning should be triggered based on the current pruned entry id and the prunable entry id.
/// Returns true if:
/// - There is no current pruned entry id (i.e., pruning has never occurred).
/// - The current pruned entry id is greater than the prunable entry id (i.e., there is something to prune).
pub(crate) fn should_trigger_prune(current: Option<u64>, prunable_entry_id: u64) -> bool {
match current {
None => true, // No pruning has occurred yet, should trigger immediately.
Some(current) => prunable_entry_id > current,
}
}
/// Returns a partition client for the given topic.
pub(crate) async fn get_partition_client(
client: &Arc<Client>,
topic: &str,
) -> Result<PartitionClient> {
client
.partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
.await
.context(BuildPartitionClientSnafu {
topic,
partition: DEFAULT_PARTITION,
})
}
/// Returns the earliest and latest offsets for the given topic.
pub(crate) async fn get_offsets_for_topic(
partition_client: &PartitionClient,
topic: &str,
) -> Result<(u64, u64)> {
let earliest_offset = partition_client
.get_offset(OffsetAt::Earliest)
.await
.context(GetOffsetSnafu { topic })?;
let latest_offset = partition_client
.get_offset(OffsetAt::Latest)
.await
.context(GetOffsetSnafu { topic })?;
Ok((earliest_offset as u64, latest_offset as u64))
}
/// Updates the pruned entry id for the given topic.
pub(crate) async fn update_pruned_entry_id(
table_metadata_manager: &TableMetadataManagerRef,
topic: &str,
pruned_entry_id: u64,
) -> Result<()> {
let prev = table_metadata_manager
.topic_name_manager()
.get(topic)
.await
.context(TableMetadataManagerSnafu)?;
table_metadata_manager
.topic_name_manager()
.update(topic, pruned_entry_id, prev)
.await
.context(UpdateTopicNameValueSnafu { topic })?;
Ok(())
}
/// Deletes the records for the given topic.
pub(crate) async fn delete_records(
partition_client: &PartitionClient,
topic: &str,
pruned_entry_id: u64,
) -> Result<()> {
partition_client
.delete_records(
// Note: here no "+1" is needed because the offset arg is exclusive,
// and it's defensive programming just in case somewhere else have a off by one error,
// see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection)
// which we use to get the end offset from high watermark
pruned_entry_id as i64,
DELETE_RECORDS_TIMEOUT.as_millis() as i32,
)
.await
.context(DeleteRecordsSnafu {
topic,
partition: DEFAULT_PARTITION,
offset: pruned_entry_id,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_trigger_prune_none_current() {
// No pruning has occurred yet, should trigger
assert!(should_trigger_prune(None, 10));
assert!(should_trigger_prune(None, 0));
}
#[test]
fn test_should_trigger_prune_prunable_greater_than_current() {
// Prunable entry id is greater than current, should trigger
assert!(should_trigger_prune(Some(5), 6));
assert!(should_trigger_prune(Some(0), 1));
assert!(should_trigger_prune(Some(99), 100));
}
#[test]
fn test_should_not_trigger_prune_prunable_equal_to_current() {
// Prunable entry id is equal to current, should not trigger
assert!(!should_trigger_prune(Some(10), 10));
assert!(!should_trigger_prune(Some(0), 0));
}
#[test]
fn test_should_not_trigger_prune_prunable_less_than_current() {
// Prunable entry id is less than current, should not trigger
assert!(!should_trigger_prune(Some(10), 9));
assert!(!should_trigger_prune(Some(100), 99));
}
#[test]
fn test_missing_region_ids_none_missing() {
let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)];
let mut result_set = HashMap::new();
result_set.insert(RegionId::new(1, 1), 10);
result_set.insert(RegionId::new(2, 2), 20);
let missing = missing_region_ids(&all_region_ids, &result_set);
assert!(missing.is_empty());
}
#[test]
fn test_missing_region_ids_some_missing() {
let all_region_ids = vec![
RegionId::new(1, 1),
RegionId::new(2, 2),
RegionId::new(3, 3),
];
let mut result_set = HashMap::new();
result_set.insert(RegionId::new(1, 1), 10);
let missing = missing_region_ids(&all_region_ids, &result_set);
assert_eq!(missing, vec![RegionId::new(2, 2), RegionId::new(3, 3)]);
}
#[test]
fn test_missing_region_ids_all_missing() {
let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)];
let result_set = HashMap::new();
let missing = missing_region_ids(&all_region_ids, &result_set);
assert_eq!(missing, all_region_ids);
}
#[test]
fn test_missing_region_ids_empty_all() {
let all_region_ids: Vec<RegionId> = vec![];
let mut result_set = HashMap::new();
result_set.insert(RegionId::new(1, 1), 10);
let missing = missing_region_ids(&all_region_ids, &result_set);
assert!(missing.is_empty());
}
}