feat(remote-wal): add remote wal prune procedure (#5714)

* feat: add remote wal prune procedure

* feat: add retry logic and remove rollback

* chore: simplify the logic

* fix: remove REMOTE_WAL_LOCK

* fix: use in-memory kv

* perf: O(n) judgement

* chore: add single write lock

* test: add unit test

* chore: remove unused function

* chore: update comments

* chore: apply comments

* chore: apply comments
This commit is contained in:
Yuhan Wang
2025-04-03 16:11:51 +08:00
committed by GitHub
parent 2acecd3620
commit f73b61e767
14 changed files with 643 additions and 8 deletions

2
Cargo.lock generated
View File

@@ -6720,12 +6720,14 @@ dependencies = [
"itertools 0.14.0",
"lazy_static",
"local-ip-address",
"log-store",
"once_cell",
"parking_lot 0.12.3",
"prometheus",
"prost 0.13.3",
"rand 0.9.0",
"regex",
"rskafka",
"serde",
"serde_json",
"servers",

View File

@@ -512,6 +512,10 @@ impl TableMetadataManager {
&self.table_route_manager
}
pub fn topic_region_manager(&self) -> &TopicRegionManager {
&self.topic_region_manager
}
#[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend

View File

@@ -27,6 +27,7 @@ const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
const REGION_LOCK_PREFIX: &str = "__region_lock";
const FLOW_LOCK_PREFIX: &str = "__flow_lock";
const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock";
/// [CatalogLock] acquires the lock on the tenant level.
pub enum CatalogLock<'a> {
@@ -231,6 +232,31 @@ impl From<FlowLock> for StringKey {
}
}
/// [RemoteWalLock] acquires the lock on the remote wal topic level.
pub enum RemoteWalLock {
Read(String),
Write(String),
}
impl Display for RemoteWalLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
RemoteWalLock::Read(s) => s,
RemoteWalLock::Write(s) => s,
};
write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key)
}
}
impl From<RemoteWalLock> for StringKey {
fn from(value: RemoteWalLock) -> Self {
match value {
RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()),
RemoteWalLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use common_procedure::StringKey;
@@ -308,5 +334,16 @@ mod tests {
string_key,
StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
);
// The remote wal lock
let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
);
let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
);
}
}

View File

@@ -95,6 +95,21 @@ impl LeaderRegionManifestInfo {
} => *data_flushed_entry_id,
}
}
/// Returns the minimum flushed entry id of the leader region.
/// It is used to determine the minimum flushed entry id that can be pruned in remote wal.
pub fn min_flushed_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
metadata_flushed_entry_id,
..
} => (*data_flushed_entry_id).min(*metadata_flushed_entry_id),
}
}
}
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;

View File

@@ -30,7 +30,7 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{error, info};
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
@@ -32,9 +34,11 @@ use crate::error::{
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
type KafkaClientRef = Arc<Client>;
/// Creates topics in kafka.
pub struct KafkaTopicCreator {
client: Client,
client: KafkaClientRef,
/// The number of partitions per topic.
num_partitions: i32,
/// The replication factor of each topic.
@@ -44,6 +48,10 @@ pub struct KafkaTopicCreator {
}
impl KafkaTopicCreator {
pub fn client(&self) -> &KafkaClientRef {
&self.client
}
async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
@@ -151,7 +159,7 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<Ka
})?;
Ok(KafkaTopicCreator {
client,
client: Arc::new(client),
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,

View File

@@ -20,6 +20,7 @@ pub(crate) mod producer;
pub(crate) mod util;
pub(crate) mod worker;
pub use client_manager::DEFAULT_PARTITION;
pub use index::{default_index_file, GlobalIndexCollector};
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::Id as EntryId;

View File

@@ -31,7 +31,7 @@ use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
pub const DEFAULT_PARTITION: i32 = 0;
/// Arc wrapper of ClientManager.
pub(crate) type ClientManagerRef = Arc<ClientManager>;

View File

@@ -8,7 +8,7 @@ license.workspace = true
mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"]
testing = []
testing = ["common-wal/testing"]
[lints]
workspace = true
@@ -52,12 +52,14 @@ humantime-serde.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
itertools.workspace = true
lazy_static.workspace = true
log-store.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true

View File

@@ -787,6 +787,31 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
BuildPartitionClient {
topic: String,
partition: i32,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to delete record from Kafka"))]
DeleteRecord {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
topic: String,
partition: i32,
offset: u64,
},
}
impl Error {
@@ -834,7 +859,9 @@ impl ErrorExt for Error {
| Error::ExceededDeadline { .. }
| Error::ChooseItems { .. }
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. } => StatusCode::Internal,
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildPartitionClient { .. }
| Error::DeleteRecord { .. } => StatusCode::Internal,
Error::Unsupported { .. } => StatusCode::Unsupported,

View File

@@ -24,6 +24,7 @@ pub mod test_util;
#[cfg(test)]
mod tests;
pub mod utils;
pub mod wal_prune;
#[derive(Clone)]
pub struct ProcedureManagerListenerAdapter(pub ProcedureManagerRef);

View File

@@ -12,13 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::region_registry::{
LeaderRegion, LeaderRegionManifestInfo, LeaderRegionRegistryRef,
};
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::Sequence;
use common_time::util::current_time_millis;
use common_wal::options::{KafkaWalOptions, WalOptions};
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::error::Result;
@@ -152,3 +165,94 @@ pub fn new_upgrade_region_reply(
)),
}
}
pub async fn new_wal_prune_metadata(
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
n_region: u32,
n_table: u32,
offsets: &[i64],
threshold: u64,
topic: String,
) -> (EntryId, Vec<RegionId>) {
let datanode_id = 1;
let from_peer = Peer::empty(datanode_id);
let mut min_last_entry_id = 0;
let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize);
for table_id in 0..n_table {
let region_ids = (0..n_region)
.map(|i| RegionId::new(table_id, i))
.collect::<Vec<_>>();
let table_info = new_test_table_info(table_id, 0..n_region).into();
let region_routes = region_ids
.iter()
.map(|region_id| RegionRoute {
region: Region::new_test(*region_id),
leader_peer: Some(from_peer.clone()),
..Default::default()
})
.collect::<Vec<_>>();
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
});
let wal_options = serde_json::to_string(&wal_options).unwrap();
let region_wal_options: HashMap<u32, String> = (0..n_region)
.map(|region_number| (region_number, wal_options.clone()))
.collect();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();
let current_region_entry_ids = region_ids
.iter()
.map(|region_id| {
let rand_n = rand::random::<u64>() as usize;
let current_last_entry_id = offsets[rand_n % offsets.len()] as u64;
min_last_entry_id = min_last_entry_id.min(current_last_entry_id);
(*region_id, current_last_entry_id)
})
.collect::<HashMap<_, _>>();
region_entry_ids.extend(current_region_entry_ids.clone());
update_in_memory_region_last_entry_id(&leader_region_registry, current_region_entry_ids)
.await
.unwrap();
}
let regions_to_flush = region_entry_ids
.iter()
.filter_map(|(region_id, last_entry_id)| {
if last_entry_id - min_last_entry_id > threshold {
Some(*region_id)
} else {
None
}
})
.collect::<Vec<_>>();
(min_last_entry_id, regions_to_flush)
}
pub async fn update_in_memory_region_last_entry_id(
leader_region_registry: &LeaderRegionRegistryRef,
region_entry_ids: HashMap<RegionId, u64>,
) -> Result<()> {
let mut key_values = Vec::with_capacity(region_entry_ids.len());
for (region_id, flushed_entry_id) in region_entry_ids {
let value = LeaderRegion {
datanode_id: 1,
manifest: LeaderRegionManifestInfo::Mito {
manifest_version: 0,
flushed_entry_id,
},
};
key_values.push((region_id, value));
}
leader_region_registry.batch_put(key_values);
Ok(())
}

View File

@@ -0,0 +1,434 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_meta::lock_key::RemoteWalLock;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey,
};
use common_telemetry::warn;
use log_store::kafka::DEFAULT_PARTITION;
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::Client;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use crate::error::{self, BuildPartitionClientSnafu, DeleteRecordSnafu, TableMetadataManagerSnafu};
use crate::Result;
type KafkaClientRef = Arc<Client>;
const TIMEOUT: i32 = 1000;
/// The state of WAL pruning.
#[derive(Debug, Serialize, Deserialize)]
pub enum WalPruneState {
Prepare,
Prune,
}
pub struct Context {
/// The Kafka client.
client: KafkaClientRef,
/// The table metadata manager.
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
}
/// The data of WAL pruning.
#[derive(Serialize, Deserialize)]
pub struct WalPruneData {
/// The topic name to prune.
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
/// If the topic has no region, the value is set to `None`.
pub min_flushed_entry_id: EntryId,
/// The state.
pub state: WalPruneState,
}
/// The procedure to prune WAL.
pub struct WalPruneProcedure {
pub data: WalPruneData,
pub context: Context,
}
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
pub fn new(topic: String, context: Context) -> Self {
Self {
data: WalPruneData {
topic,
min_flushed_entry_id: 0,
state: WalPruneState::Prepare,
},
context,
}
}
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
Ok(Self { data, context })
}
/// Calculate the last entry id to prune for each topic.
pub async fn on_prepare(&mut self) -> Result<Status> {
let region_ids = self
.context
.table_metadata_manager
.topic_region_manager()
.regions(&self.data.topic)
.await
.context(TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to get topic-region map",
})?;
let flush_entry_ids_map: HashMap<_, _> = self
.context
.leader_region_registry
.batch_get(region_ids.iter().cloned())
.into_iter()
.map(|(region_id, region)| {
let flushed_entry_id = region.manifest.min_flushed_entry_id();
(region_id, flushed_entry_id)
})
.collect();
if region_ids.is_empty() {
// No regions to prune.
return Ok(Status::done());
}
// Check if the `flush_entry_ids_map` contains all region ids.
let non_collected_region_ids =
check_heartbeat_collected_region_ids(&region_ids, &flush_entry_ids_map);
if !non_collected_region_ids.is_empty() {
// The heartbeat collected region ids do not contain all region ids in the topic-region map.
// In this case, we should not prune the WAL.
warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
return Ok(Status::done());
}
// Safety: `flush_entry_ids_map` are not empty.
self.data.min_flushed_entry_id = *(flush_entry_ids_map.values().min().unwrap());
self.data.state = WalPruneState::Prune;
Ok(Status::executing(true))
}
/// Prune the WAL.
pub async fn on_prune(&mut self) -> Result<Status> {
// Safety: last_entry_ids are loaded in on_prepare.
let partition_client = self
.context
.client
.partition_client(
self.data.topic.clone(),
DEFAULT_PARTITION,
UnknownTopicHandling::Retry,
)
.await
.context(BuildPartitionClientSnafu {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
})?;
partition_client
.delete_records(self.data.min_flushed_entry_id as i64, TIMEOUT)
.await
.context(DeleteRecordSnafu {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
offset: self.data.min_flushed_entry_id,
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to delete records",
})?;
// TODO(CookiePie): Persist the minimum flushed entry id to the table metadata manager.
Ok(Status::done())
}
}
#[async_trait::async_trait]
impl Procedure for WalPruneProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn rollback_supported(&self) -> bool {
false
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
match state {
WalPruneState::Prepare => self.on_prepare().await,
WalPruneState::Prune => self.on_prune().await,
}
.map_err(|e| {
if e.is_retryable() {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// WAL prune procedure will read the topic-region map from the table metadata manager,
/// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
/// But the modifications are atomic, so it does not conflict with the procedure.
/// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
fn lock_key(&self) -> LockKey {
let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
LockKey::new(vec![lock_key])
}
}
/// Check if the heartbeat collected region ids contains all region ids in the topic-region map.
fn check_heartbeat_collected_region_ids(
region_ids: &[RegionId],
heartbeat_collected_region_ids: &HashMap<RegionId, u64>,
) -> Vec<RegionId> {
let mut non_collected_region_ids = Vec::new();
for region_id in region_ids {
if !heartbeat_collected_region_ids.contains_key(region_id) {
non_collected_region_ids.push(*region_id);
}
}
non_collected_region_ids
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::wal_options_allocator::build_kafka_topic_creator;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use rskafka::record::Record;
use super::*;
use crate::procedure::test_util::new_wal_prune_metadata;
struct TestEnv {
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
}
impl TestEnv {
fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
Self {
table_metadata_manager,
leader_region_registry,
}
}
fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
fn leader_region_registry(&self) -> &LeaderRegionRegistryRef {
&self.leader_region_registry
}
}
/// Mock a test env for testing.
/// Including:
/// 1. Create a test env with a mailbox, a table metadata manager and a in-memory kv backend.
/// 2. Prepare some data in the table metadata manager and in-memory kv backend.
/// 3. Generate a `WalPruneProcedure` with the test env.
/// 4. Return the test env, the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_env(
topic: String,
broker_endpoints: Vec<String>,
env: &TestEnv,
) -> (WalPruneProcedure, u64, Vec<RegionId>) {
// Creates a topic manager.
let kafka_topic = KafkaTopicConfig {
replication_factor: broker_endpoints.len() as i16,
..Default::default()
};
let config = MetasrvKafkaConfig {
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let table_metadata_manager = env.table_metadata_manager().clone();
let leader_region_registry = env.leader_region_registry().clone();
let offsets = mock_wal_entries(topic_creator.client().clone(), &topic, 10).await;
let (min_last_entry_id, regions_to_flush) = new_wal_prune_metadata(
table_metadata_manager.clone(),
leader_region_registry.clone(),
10,
5,
&offsets,
10,
topic.clone(),
)
.await;
let context = Context {
client: topic_creator.client().clone(),
table_metadata_manager,
leader_region_registry,
};
let wal_prune_procedure = WalPruneProcedure::new(topic, context);
(wal_prune_procedure, min_last_entry_id, regions_to_flush)
}
fn record(i: usize) -> Record {
let key = format!("key_{i}");
let value = format!("value_{i}");
Record {
key: Some(key.into()),
value: Some(value.into()),
timestamp: chrono::Utc::now(),
headers: Default::default(),
}
}
async fn mock_wal_entries(
client: KafkaClientRef,
topic_name: &str,
n_entries: usize,
) -> Vec<i64> {
let controller_client = client.controller_client().unwrap();
let _ = controller_client
.create_topic(topic_name, 1, 1, 5_000)
.await;
let partition_client = client
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
.await
.unwrap();
let mut offsets = Vec::with_capacity(n_entries);
for i in 0..n_entries {
let record = vec![record(i)];
let offset = partition_client
.produce(
record,
rskafka::client::partition::Compression::NoCompression,
)
.await
.unwrap();
offsets.extend(offset);
}
offsets
}
async fn check_entry_id_existence(
client: KafkaClientRef,
topic_name: &str,
entry_id: i64,
) -> bool {
let partition_client = client
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
.await
.unwrap();
let (records, _high_watermark) = partition_client
.fetch_records(entry_id, 0..10001, 5_000)
.await
.unwrap();
!records.is_empty()
}
async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
let controller_client = client.controller_client().unwrap();
controller_client
.delete_topic(topic_name, 5_000)
.await
.unwrap();
}
#[tokio::test]
async fn test_procedure_execution() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
common_telemetry::init_default_ut_logging();
let topic_name = "greptime_test_topic".to_string();
let env = TestEnv::new();
let (mut procedure, min_last_entry_id, _) =
mock_test_env(topic_name.clone(), broker_endpoints, &env).await;
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id);
// Step 2: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after `min_flushed_entry_id` still exist.
assert!(
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64,
)
.await
);
// Check if the entry s before `min_flushed_entry_id` are deleted.
assert!(
procedure.data.min_flushed_entry_id == 0
|| !check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64 - 1,
)
.await
);
// `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
})
})
.await;
}
}

View File

@@ -409,9 +409,9 @@ impl RegionManifestInfo {
flushed_entry_id, ..
} => *flushed_entry_id,
RegionManifestInfo::Metric {
metadata_flushed_entry_id,
data_flushed_entry_id,
..
} => *metadata_flushed_entry_id,
} => *data_flushed_entry_id,
}
}