feat(remote-wal): send flush request when pruning remote wal (#5825)

* feat: update minimum entry id in kvbackend

* fix: persist before delete

* chore: apply comments

* feat: add flush region in wal prune procedure

* fix: cherry-pick error

* chore: fmt

* chore: drop rx to avoid block by response

* chore: update comments

* chore: apply review comments

* test: fix unit test

* feat: add option not to flush region during wal prune

* test: fix unit test

* fix: delete at minimum replay entry id + 1

* fix: cas

* chore: add comments

* chore: apply review comments

* chore: apply review comments

* chore: fix error msg

* chore: apply review comments

* fix: idempotent cas

* refactor: use a one-way sender

* chore: better err msg

* chore: fix unit test

* chore: apply review comments

* chore: apply review comments

* chore: replace send oneway
This commit is contained in:
Yuhan Wang
2025-04-07 22:05:18 +08:00
committed by GitHub
parent 981d51785b
commit 6e6e335a81
14 changed files with 702 additions and 182 deletions

View File

@@ -192,6 +192,12 @@ pub struct DropFlow {
pub flownode_ids: Vec<FlownodeId>,
}
/// Flushes a batch of regions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FlushRegions {
pub region_ids: Vec<RegionId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens a region.
@@ -208,6 +214,8 @@ pub enum Instruction {
DowngradeRegion(DowngradeRegion),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegion(FlushRegions),
}
/// The reply of [UpgradeRegion].

View File

@@ -57,7 +57,10 @@
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! 12. Kafka topic key: `__topic_name/kafka/{topic_name}`
//! - The key is used to mark existing topics in kafka for WAL.
//! - The key is used to track existing topics in Kafka.
//! - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents
//! the highest entry id that has been pruned from the remote WAL.
//! - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id).
//!
//! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}`
//! - Mapping {topic_name} to {region_id}
@@ -137,6 +140,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use topic_name::TopicNameManager;
use topic_region::{TopicRegionKey, TopicRegionManager};
use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
@@ -309,6 +313,7 @@ pub struct TableMetadataManager {
schema_manager: SchemaManager,
table_route_manager: TableRouteManager,
tombstone_manager: TombstoneManager,
topic_name_manager: TopicNameManager,
topic_region_manager: TopicRegionManager,
kv_backend: KvBackendRef,
}
@@ -460,6 +465,7 @@ impl TableMetadataManager {
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
tombstone_manager: TombstoneManager::new(kv_backend.clone()),
topic_name_manager: TopicNameManager::new(kv_backend.clone()),
topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
kv_backend,
}
@@ -513,6 +519,10 @@ impl TableMetadataManager {
&self.table_route_manager
}
pub fn topic_name_manager(&self) -> &TopicNameManager {
&self.topic_name_manager
}
pub fn topic_region_manager(&self) -> &TopicRegionManager {
&self.topic_region_manager
}

View File

@@ -15,11 +15,14 @@
use std::fmt::{self, Display};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result};
use crate::ensure_values;
use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
DeserializedValueWithBytes, MetadataKey, MetadataValue, KAFKA_TOPIC_KEY_PATTERN,
KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -31,8 +34,32 @@ pub struct TopicNameKey<'a> {
pub topic: &'a str,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TopicNameValue;
/// The value associated with a topic name key.
///
/// The `pruned_entry_id` is the highest entry id that has been pruned from the remote WAL.
/// When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimal available entry id).
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub struct TopicNameValue {
pub pruned_entry_id: u64,
}
impl TopicNameValue {
pub fn new(pruned_entry_id: u64) -> Self {
Self { pruned_entry_id }
}
}
impl MetadataValue for TopicNameValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self> {
let value = serde_json::from_slice::<TopicNameValue>(raw_value).context(DecodeJsonSnafu)?;
Ok(value)
}
fn try_as_raw_value(&self) -> Result<Vec<u8>> {
let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?;
Ok(raw_value)
}
}
impl<'a> TopicNameKey<'a> {
pub fn new(topic: &'a str) -> Self {
@@ -114,13 +141,16 @@ impl TopicNameManager {
{
let topics =
serde_json::from_slice::<Vec<String>>(&kv.value).context(DecodeJsonSnafu)?;
let mut reqs = topics
.iter()
.map(|topic| {
let key = TopicNameKey::new(topic);
TxnOp::Put(key.to_bytes(), vec![])
})
.collect::<Vec<_>>();
let mut reqs = Vec::with_capacity(topics.len() + 1);
for topic in topics {
let topic_name_key = TopicNameKey::new(&topic);
let topic_name_value = TopicNameValue::new(0);
let put_req = TxnOp::Put(
topic_name_key.to_bytes(),
topic_name_value.try_as_raw_value()?,
);
reqs.push(put_req);
}
let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec());
reqs.push(delete_req);
let txn = Txn::new().and_then(reqs);
@@ -129,7 +159,7 @@ impl TopicNameManager {
Ok(())
}
/// Range query for topics.
/// Range query for topics. Only the keys are returned.
/// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys.
pub async fn range(&self) -> Result<Vec<String>> {
let prefix = TopicNameKey::range_start_key();
@@ -142,25 +172,72 @@ impl TopicNameManager {
.collect::<Result<Vec<String>>>()
}
/// Put topics into kvbackend.
/// Put topics into kvbackend. The value is set to 0 by default.
pub async fn batch_put(&self, topic_name_keys: Vec<TopicNameKey<'_>>) -> Result<()> {
let mut kvs = Vec::with_capacity(topic_name_keys.len());
let topic_name_value = TopicNameValue::new(0);
for topic_name_key in &topic_name_keys {
let kv = KeyValue {
key: topic_name_key.to_bytes(),
value: topic_name_value.clone().try_as_raw_value()?,
};
kvs.push(kv);
}
let req = BatchPutRequest {
kvs: topic_name_keys
.iter()
.map(|key| KeyValue {
key: key.to_bytes(),
value: vec![],
})
.collect(),
kvs,
prev_kv: false,
};
self.kv_backend.batch_put(req).await?;
Ok(())
}
/// Get value for a specific topic.
pub async fn get(
&self,
topic: &str,
) -> Result<Option<DeserializedValueWithBytes<TopicNameValue>>> {
let key = TopicNameKey::new(topic);
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
/// Update the topic name key and value in the kv backend.
pub async fn update(
&self,
topic: &str,
pruned_entry_id: u64,
prev: Option<DeserializedValueWithBytes<TopicNameValue>>,
) -> Result<()> {
let key = TopicNameKey::new(topic);
let raw_key = key.to_bytes();
let value = TopicNameValue::new(pruned_entry_id);
let new_raw_value = value.try_as_raw_value()?;
let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default();
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone());
let mut r = self.kv_backend.txn(txn).await?;
if !r.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set)
.context(UnexpectedSnafu {
err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue",
})?;
let op_name = "updating TopicNameValue";
ensure_values!(raw_value, new_raw_value, op_name);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use super::*;
@@ -204,7 +281,19 @@ mod tests {
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
let topics = manager.range().await.unwrap();
assert_eq!(topics, all_topics);
for topic in &topics {
let value = manager.get(topic).await.unwrap().unwrap();
assert_eq!(value.pruned_entry_id, 0);
manager.update(topic, 1, Some(value.clone())).await.unwrap();
let new_value = manager.get(topic).await.unwrap().unwrap();
assert_eq!(new_value.pruned_entry_id, 1);
// Update twice, nothing changed
manager.update(topic, 1, Some(value.clone())).await.unwrap();
let new_value = manager.get(topic).await.unwrap().unwrap();
assert_eq!(new_value.pruned_entry_id, 1);
// Bad cas, emit error
let err = manager.update(topic, 3, Some(value)).await.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
}
}
}

View File

@@ -26,6 +26,7 @@ use store_api::storage::RegionId;
mod close_region;
mod downgrade_region;
mod flush_region;
mod open_region;
mod upgrade_region;
@@ -42,7 +43,7 @@ pub struct RegionHeartbeatResponseHandler {
/// Handler of the instruction.
pub type InstructionHandler =
Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, InstructionReply> + Send>;
Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, Option<InstructionReply>> + Send>;
#[derive(Clone)]
pub struct HandlerContext {
@@ -94,6 +95,9 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
})),
}
}
}
@@ -129,8 +133,10 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
})
.await;
if let Err(e) = mailbox.send((meta, reply)).await {
error!(e; "Failed to send reply to mailbox");
if let Some(reply) = reply {
if let Err(e) = mailbox.send((meta, reply)).await {
error!(e; "Failed to send reply to mailbox");
}
}
});

View File

@@ -26,28 +26,28 @@ impl HandlerContext {
pub(crate) fn handle_close_region_instruction(
self,
region_ident: RegionIdent,
) -> BoxFuture<'static, InstructionReply> {
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Close(RegionCloseRequest {});
let result = self.region_server.handle_request(region_id, request).await;
match result {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
Ok(_) => Some(InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}),
})),
Err(error::Error::RegionNotFound { .. }) => {
warn!("Received a close region instruction from meta, but target region:{region_id} is not found.");
InstructionReply::CloseRegion(SimpleReply {
Some(InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
})
}))
}
Err(err) => InstructionReply::CloseRegion(SimpleReply {
Err(err) => Some(InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
}),
})),
}
})
}

View File

@@ -24,31 +24,34 @@ use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult;
impl HandlerContext {
async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> InstructionReply {
async fn downgrade_to_follower_gracefully(
&self,
region_id: RegionId,
) -> Option<InstructionReply> {
match self
.region_server
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
{
Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: true,
error: None,
})
}))
}
Ok(SetRegionRoleStateResponse::NotFound) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
})
}))
}
Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}),
})),
}
}
@@ -59,15 +62,15 @@ impl HandlerContext {
flush_timeout,
reject_write,
}: DowngradeRegion,
) -> BoxFuture<'static, InstructionReply> {
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
warn!("Region: {region_id} is not found");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
});
}));
};
let region_server_moved = self.region_server.clone();
@@ -99,19 +102,19 @@ impl HandlerContext {
Ok(SetRegionRoleStateResponse::Success { .. }) => {}
Ok(SetRegionRoleStateResponse::NotFound) => {
warn!("Region: {region_id} is not found");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
});
}));
}
Err(err) => {
warn!(err; "Failed to convert region to downgrading leader");
return InstructionReply::DowngradeRegion(DowngradeRegionReply {
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
});
}));
}
}
}
@@ -144,18 +147,20 @@ impl HandlerContext {
let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await;
match result {
WaitResult::Timeout => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
}),
WaitResult::Timeout => {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("Flush region: {region_id} is timeout")),
}))
}
WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
WaitResult::Finish(Err(err)) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
})
}))
}
}
})
@@ -196,9 +201,9 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply {
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
@@ -238,9 +243,9 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply {
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 1024);
@@ -272,9 +277,9 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply {
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
@@ -310,8 +315,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
@@ -325,11 +330,11 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::DowngradeRegion(reply) = reply {
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply.error.is_none());
assert_eq!(reply.last_entry_id.unwrap(), 1024);
@@ -371,8 +376,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("timeout"));
assert!(reply.last_entry_id.is_none());
@@ -386,11 +391,11 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::DowngradeRegion(reply) = reply {
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply.error.unwrap().contains("flush failed"));
assert!(reply.last_entry_id.is_none());
@@ -417,8 +422,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.last_entry_id.is_none());
@@ -449,8 +454,8 @@ mod tests {
reject_write: false,
})
.await;
assert_matches!(reply, InstructionReply::DowngradeRegion(_));
if let InstructionReply::DowngradeRegion(reply) = reply {
assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
assert!(reply.exists);
assert!(reply
.error

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_region_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
for region_id in flush_regions.region_ids {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
let result = self.region_server.handle_request(region_id, request).await;
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
warn!("Received a flush region instruction from meta, but target region: {region_id} is not found.");
}
Err(err) => {
warn!(
"Failed to flush region: {region_id}, error: {err}",
region_id = region_id,
err = err,
);
}
}
}
None
})
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, RwLock};
use common_meta::instruction::FlushRegions;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::storage::RegionId;
use super::*;
use crate::tests::{mock_region_server, MockRegionEngine};
#[tokio::test]
async fn test_handle_flush_region_instruction() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let mock_region_server = mock_region_server();
let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
for region_id in &region_ids {
let flushed_region_ids_ref = flushed_region_ids.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn =
Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Ok(0)
}))
});
mock_region_server.register_test_region(*region_id, mock_engine);
}
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = handler_context
.clone()
.handle_flush_region_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
assert!(reply.is_none());
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_region_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
}
}

View File

@@ -30,7 +30,7 @@ impl HandlerContext {
region_wal_options,
skip_wal_replay,
}: OpenRegion,
) -> BoxFuture<'static, InstructionReply> {
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
prepare_wal_options(&mut region_options, region_id, &region_wal_options);
@@ -43,10 +43,10 @@ impl HandlerContext {
let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok();
let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
InstructionReply::OpenRegion(SimpleReply {
Some(InstructionReply::OpenRegion(SimpleReply {
result: success,
error,
})
}))
})
}
}

View File

@@ -29,22 +29,22 @@ impl HandlerContext {
replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
error: None,
});
}));
};
if writable {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
});
}));
}
let region_server_moved = self.region_server.clone();
@@ -79,11 +79,11 @@ impl HandlerContext {
// Returns immediately
let Some(replay_timeout) = replay_timeout else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
});
}));
};
// We don't care that it returns a newly registered or running task.
@@ -91,22 +91,24 @@ impl HandlerContext {
let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
match result {
WaitResult::Timeout => InstructionReply::UpgradeRegion(UpgradeRegionReply {
WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
}),
WaitResult::Finish(Ok(_)) => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}),
})),
WaitResult::Finish(Ok(_)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}))
}
WaitResult::Finish(Err(err)) => {
InstructionReply::UpgradeRegion(UpgradeRegionReply {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: Some(format!("{err:?}")),
})
}))
}
}
})
@@ -149,9 +151,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(!reply.exists);
assert!(reply.error.is_none());
}
@@ -187,9 +189,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -226,9 +228,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -268,9 +270,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -286,11 +288,11 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -328,10 +330,10 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
// It didn't wait for handle returns; it had no idea about the error.
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -346,9 +348,9 @@ mod tests {
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
if let InstructionReply::UpgradeRegion(reply) = reply {
if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_some());

View File

@@ -802,8 +802,13 @@ pub enum Error {
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to delete record from Kafka"))]
DeleteRecord {
#[snafu(display(
"Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
topic,
partition,
offset
))]
DeleteRecords {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
@@ -812,6 +817,15 @@ pub enum Error {
partition: i32,
offset: u64,
},
#[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
UpdateTopicNameValue {
topic: String,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_meta::error::Error,
},
}
impl Error {
@@ -861,7 +875,7 @@ impl ErrorExt for Error {
| Error::FlowStateHandler { .. }
| Error::BuildWalOptionsAllocator { .. }
| Error::BuildPartitionClient { .. }
| Error::DeleteRecord { .. } => StatusCode::Internal,
| Error::DeleteRecords { .. } => StatusCode::Internal,
Error::Unsupported { .. } => StatusCode::Unsupported,
@@ -924,7 +938,8 @@ impl ErrorExt for Error {
| Error::TableMetadataManager { source, .. }
| Error::MaintenanceModeManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()

View File

@@ -444,7 +444,7 @@ impl Mailbox for HeartbeatMailbox {
let (tx, rx) = oneshot::channel();
let _ = self.senders.insert(message_id, tx);
let deadline = Instant::now() + timeout;
let _ = self.timeouts.insert(message_id, deadline);
self.timeouts.insert(message_id, deadline);
self.timeout_notify.notify_one();
self.pushers.push(pusher_id, msg).await?;
@@ -452,6 +452,18 @@ impl Mailbox for HeartbeatMailbox {
Ok(MailboxReceiver::new(message_id, rx, *ch))
}
async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
let message_id = 0; // one-way message, same as `broadcast`
msg.id = message_id;
let pusher_id = ch.pusher_id();
debug!("Sending mailbox message {msg:?} to {pusher_id}");
self.pushers.push(pusher_id, msg).await?;
Ok(())
}
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
self.pushers.broadcast(ch.pusher_range(), msg).await
}

View File

@@ -21,6 +21,7 @@ use common_meta::instruction::{
};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::topic_name::TopicNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::region_registry::{
@@ -177,7 +178,8 @@ pub async fn new_wal_prune_metadata(
) -> (EntryId, Vec<RegionId>) {
let datanode_id = 1;
let from_peer = Peer::empty(datanode_id);
let mut min_last_entry_id = 0;
let mut min_flushed_entry_id = u64::MAX;
let mut max_flushed_entry_id = 0;
let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize);
for table_id in 0..n_table {
let region_ids = (0..n_region)
@@ -208,36 +210,42 @@ pub async fn new_wal_prune_metadata(
)
.await
.unwrap();
table_metadata_manager
.topic_name_manager()
.batch_put(vec![TopicNameKey::new(&topic)])
.await
.unwrap();
let current_region_entry_ids = region_ids
.iter()
.map(|region_id| {
let rand_n = rand::random::<u64>() as usize;
let current_last_entry_id = offsets[rand_n % offsets.len()] as u64;
min_last_entry_id = min_last_entry_id.min(current_last_entry_id);
(*region_id, current_last_entry_id)
let current_flushed_entry_id = offsets[rand_n % offsets.len()] as u64;
min_flushed_entry_id = min_flushed_entry_id.min(current_flushed_entry_id);
max_flushed_entry_id = max_flushed_entry_id.max(current_flushed_entry_id);
(*region_id, current_flushed_entry_id)
})
.collect::<HashMap<_, _>>();
region_entry_ids.extend(current_region_entry_ids.clone());
update_in_memory_region_last_entry_id(&leader_region_registry, current_region_entry_ids)
update_in_memory_region_flushed_entry_id(&leader_region_registry, current_region_entry_ids)
.await
.unwrap();
}
let regions_to_flush = region_entry_ids
.iter()
.filter_map(|(region_id, last_entry_id)| {
if last_entry_id - min_last_entry_id > threshold {
.filter_map(|(region_id, flushed_entry_id)| {
if max_flushed_entry_id - flushed_entry_id > threshold {
Some(*region_id)
} else {
None
}
})
.collect::<Vec<_>>();
(min_last_entry_id, regions_to_flush)
(min_flushed_entry_id, regions_to_flush)
}
pub async fn update_in_memory_region_last_entry_id(
pub async fn update_in_memory_region_flushed_entry_id(
leader_region_registry: &LeaderRegionRegistryRef,
region_entry_ids: HashMap<RegionId, u64>,
) -> Result<()> {

View File

@@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::instruction::{FlushRegions, Instruction};
use common_meta::key::TableMetadataManagerRef;
use common_meta::lock_key::RemoteWalLock;
use common_meta::peer::Peer;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
@@ -25,6 +29,7 @@ use common_procedure::{
Result as ProcedureResult, Status, StringKey,
};
use common_telemetry::warn;
use itertools::{Itertools, MinMaxResult};
use log_store::kafka::DEFAULT_PARTITION;
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::Client;
@@ -33,17 +38,23 @@ use snafu::ResultExt;
use store_api::logstore::EntryId;
use store_api::storage::RegionId;
use crate::error::{self, BuildPartitionClientSnafu, DeleteRecordSnafu, TableMetadataManagerSnafu};
use crate::error::{
self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu,
UpdateTopicNameValueSnafu,
};
use crate::service::mailbox::{Channel, MailboxRef};
use crate::Result;
type KafkaClientRef = Arc<Client>;
const TIMEOUT: i32 = 1000;
/// No timeout for flush request.
const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1);
/// The state of WAL pruning.
#[derive(Debug, Serialize, Deserialize)]
pub enum WalPruneState {
Prepare,
FlushRegion,
Prune,
}
@@ -52,7 +63,12 @@ pub struct Context {
client: KafkaClientRef,
/// The table metadata manager.
table_metadata_manager: TableMetadataManagerRef,
/// The leader region registry.
leader_region_registry: LeaderRegionRegistryRef,
/// Server address of metasrv.
server_addr: String,
/// The mailbox to send messages.
mailbox: MailboxRef,
}
/// The data of WAL pruning.
@@ -61,8 +77,10 @@ pub struct WalPruneData {
/// The topic name to prune.
pub topic: String,
/// The minimum flush entry id for topic, which is used to prune the WAL.
/// If the topic has no region, the value is set to `None`.
pub min_flushed_entry_id: EntryId,
pub regions_to_flush: Vec<RegionId>,
/// If `flushed_entry_id` + `trigger_flush_threshold` < `max_flushed_entry_id`, send a flush request to the region.
pub trigger_flush_threshold: Option<u64>,
/// The state.
pub state: WalPruneState,
}
@@ -76,11 +94,13 @@ pub struct WalPruneProcedure {
impl WalPruneProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
pub fn new(topic: String, context: Context) -> Self {
pub fn new(topic: String, context: Context, trigger_flush_threshold: Option<u64>) -> Self {
Self {
data: WalPruneData {
topic,
min_flushed_entry_id: 0,
trigger_flush_threshold,
regions_to_flush: vec![],
state: WalPruneState::Prepare,
},
context,
@@ -92,7 +112,64 @@ impl WalPruneProcedure {
Ok(Self { data, context })
}
/// Calculate the last entry id to prune for each topic.
async fn build_peer_to_region_ids_map(
&self,
ctx: &Context,
region_ids: &[RegionId],
) -> Result<HashMap<Peer, Vec<RegionId>>> {
let table_ids = region_ids
.iter()
.map(|region_id| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_ids_table_routes_map = ctx
.table_metadata_manager
.table_route_manager()
.batch_get_physical_table_routes(&table_ids)
.await
.context(TableMetadataManagerSnafu)?;
let mut peer_region_ids_map = HashMap::new();
for region_id in region_ids {
let table_id = region_id.table_id();
let table_route = match table_ids_table_routes_map.get(&table_id) {
Some(route) => route,
None => return error::TableRouteNotFoundSnafu { table_id }.fail(),
};
for region_route in &table_route.region_routes {
if region_route.region.id != *region_id {
continue;
}
if let Some(peer) = &region_route.leader_peer {
peer_region_ids_map
.entry(peer.clone())
.or_insert_with(Vec::new)
.push(*region_id);
}
}
}
Ok(peer_region_ids_map)
}
fn build_flush_region_instruction(
&self,
peer_region_ids_map: HashMap<Peer, Vec<RegionId>>,
) -> Result<Vec<(Peer, Instruction)>> {
let peer_and_instructions = peer_region_ids_map
.into_iter()
.map(|(peer, region_ids)| {
let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids });
(peer.clone(), flush_instruction)
})
.collect();
Ok(peer_and_instructions)
}
/// Prepare the entry id to prune and regions to flush.
///
/// Retry:
/// - Failed to retrieve any metadata.
pub async fn on_prepare(&mut self) -> Result<Status> {
let region_ids = self
.context
@@ -116,10 +193,6 @@ impl WalPruneProcedure {
})
.collect();
if region_ids.is_empty() {
// No regions to prune.
return Ok(Status::done());
}
// Check if the `flush_entry_ids_map` contains all region ids.
let non_collected_region_ids =
check_heartbeat_collected_region_ids(&region_ids, &flush_entry_ids_map);
@@ -131,15 +204,64 @@ impl WalPruneProcedure {
return Ok(Status::done());
}
// Safety: `flush_entry_ids_map` are not empty.
self.data.min_flushed_entry_id = *(flush_entry_ids_map.values().min().unwrap());
let min_max_result = flush_entry_ids_map.values().minmax();
let max_flushed_entry_id = match min_max_result {
MinMaxResult::NoElements => {
return Ok(Status::done());
}
MinMaxResult::OneElement(flushed_entry_id) => {
self.data.min_flushed_entry_id = *flushed_entry_id;
*flushed_entry_id
}
MinMaxResult::MinMax(min_flushed_entry_id, max_flushed_entry_id) => {
self.data.min_flushed_entry_id = *min_flushed_entry_id;
*max_flushed_entry_id
}
};
if let Some(threshold) = self.data.trigger_flush_threshold {
for (region_id, flushed_entry_id) in flush_entry_ids_map {
if flushed_entry_id + threshold < max_flushed_entry_id {
self.data.regions_to_flush.push(region_id);
}
}
self.data.state = WalPruneState::FlushRegion;
} else {
self.data.state = WalPruneState::Prune;
}
Ok(Status::executing(true))
}
/// Send the flush request to regions with low flush entry id.
pub async fn on_sending_flush_request(&mut self) -> Result<Status> {
let peer_to_region_ids_map = self
.build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
.await?;
let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
for (peer, flush_instruction) in flush_instructions.into_iter() {
let msg = MailboxMessage::json_message(
&format!("Flush regions: {}", flush_instruction),
&format!("Metasrv@{}", self.context.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(peer.id);
self.context.mailbox.send_oneway(&ch, msg).await?;
}
self.data.state = WalPruneState::Prune;
Ok(Status::executing(true))
}
/// Prune the WAL.
/// Prune the WAL and persist the minimum flushed entry id.
///
/// Retry:
/// - Failed to update the minimum flushed entry id in kvbackend.
/// - Failed to delete records.
pub async fn on_prune(&mut self) -> Result<Status> {
// Safety: last_entry_ids are loaded in on_prepare.
// Safety: flushed_entry_ids are loaded in on_prepare.
let partition_client = self
.context
.client
@@ -154,20 +276,54 @@ impl WalPruneProcedure {
partition: DEFAULT_PARTITION,
})?;
partition_client
.delete_records(self.data.min_flushed_entry_id as i64, TIMEOUT)
// Should update the min flushed entry id in the kv backend before deleting records.
// Otherwise, when a datanode restarts, it will not be able to find the wal entries.
let prev = self
.context
.table_metadata_manager
.topic_name_manager()
.get(&self.data.topic)
.await
.context(DeleteRecordSnafu {
topic: self.data.topic.clone(),
partition: DEFAULT_PARTITION,
offset: self.data.min_flushed_entry_id,
.context(TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TopicNameValue, topic: {}", self.data.topic),
})?;
self.context
.table_metadata_manager
.topic_name_manager()
.update(&self.data.topic, self.data.min_flushed_entry_id, prev)
.await
.context(UpdateTopicNameValueSnafu {
topic: &self.data.topic,
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: "Failed to delete records",
reason: format!(
"Failed to update pruned entry id for topic: {}",
self.data.topic
),
})?;
partition_client
.delete_records(
(self.data.min_flushed_entry_id + 1) as i64,
DELETE_RECORDS_TIMEOUT.as_millis() as i32,
)
.await
.context(DeleteRecordsSnafu {
topic: &self.data.topic,
partition: DEFAULT_PARTITION,
offset: (self.data.min_flushed_entry_id + 1),
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to delete records for topic: {}, partition: {}, offset: {}",
self.data.topic,
DEFAULT_PARTITION,
self.data.min_flushed_entry_id + 1
),
})?;
// TODO(CookiePie): Persist the minimum flushed entry id to the table metadata manager.
Ok(Status::done())
}
}
@@ -187,6 +343,7 @@ impl Procedure for WalPruneProcedure {
match state {
WalPruneState::Prepare => self.on_prepare().await,
WalPruneState::FlushRegion => self.on_sending_flush_request().await,
WalPruneState::Prune => self.on_prune().await,
}
.map_err(|e| {
@@ -230,21 +387,27 @@ fn check_heartbeat_collected_region_ids(
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::HeartbeatResponse;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_kafka_topic_creator;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use rskafka::record::Record;
use tokio::sync::mpsc::Receiver;
use super::*;
use crate::procedure::test_util::new_wal_prune_metadata;
use crate::handler::HeartbeatMailbox;
use crate::procedure::test_util::{new_wal_prune_metadata, MailboxContext};
struct TestEnv {
table_metadata_manager: TableMetadataManagerRef,
leader_region_registry: LeaderRegionRegistryRef,
mailbox: MailboxContext,
server_addr: String,
}
impl TestEnv {
@@ -252,9 +415,16 @@ mod tests {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
table_metadata_manager,
leader_region_registry,
mailbox: mailbox_ctx,
server_addr: "localhost".to_string(),
}
}
@@ -265,14 +435,21 @@ mod tests {
fn leader_region_registry(&self) -> &LeaderRegionRegistryRef {
&self.leader_region_registry
}
fn mailbox_context(&self) -> &MailboxContext {
&self.mailbox
}
fn server_addr(&self) -> &str {
&self.server_addr
}
}
/// Mock a test env for testing.
/// Including:
/// 1. Create a test env with a mailbox, a table metadata manager and a in-memory kv backend.
/// 2. Prepare some data in the table metadata manager and in-memory kv backend.
/// 3. Generate a `WalPruneProcedure` with the test env.
/// 4. Return the test env, the procedure, the minimum last entry id to prune and the regions to flush.
/// 1. Prepare some data in the table metadata manager and in-memory kv backend.
/// 2. Generate a `WalPruneProcedure` with the test env.
/// 3. Return the procedure, the minimum last entry id to prune and the regions to flush.
async fn mock_test_env(
topic: String,
broker_endpoints: Vec<String>,
@@ -294,15 +471,26 @@ mod tests {
let topic_creator = build_kafka_topic_creator(&config).await.unwrap();
let table_metadata_manager = env.table_metadata_manager().clone();
let leader_region_registry = env.leader_region_registry().clone();
let offsets = mock_wal_entries(topic_creator.client().clone(), &topic, 10).await;
let mailbox = env.mailbox_context().mailbox().clone();
let (min_last_entry_id, regions_to_flush) = new_wal_prune_metadata(
let n_region = 10;
let n_table = 5;
let threshold = 10;
// 5 entries per region.
let offsets = mock_wal_entries(
topic_creator.client().clone(),
&topic,
(n_region * n_table * 5) as usize,
)
.await;
let (min_flushed_entry_id, regions_to_flush) = new_wal_prune_metadata(
table_metadata_manager.clone(),
leader_region_registry.clone(),
10,
5,
n_region,
n_table,
&offsets,
10,
threshold,
topic.clone(),
)
.await;
@@ -311,10 +499,12 @@ mod tests {
client: topic_creator.client().clone(),
table_metadata_manager,
leader_region_registry,
mailbox,
server_addr: env.server_addr().to_string(),
};
let wal_prune_procedure = WalPruneProcedure::new(topic, context);
(wal_prune_procedure, min_last_entry_id, regions_to_flush)
let wal_prune_procedure = WalPruneProcedure::new(topic, context, Some(threshold));
(wal_prune_procedure, min_flushed_entry_id, regions_to_flush)
}
fn record(i: usize) -> Record {
@@ -360,16 +550,24 @@ mod tests {
client: KafkaClientRef,
topic_name: &str,
entry_id: i64,
) -> bool {
expect_success: bool,
) {
let partition_client = client
.partition_client(topic_name, 0, UnknownTopicHandling::Retry)
.await
.unwrap();
let (records, _high_watermark) = partition_client
let res = partition_client
.fetch_records(entry_id, 0..10001, 5_000)
.await
.unwrap();
!records.is_empty()
.await;
if expect_success {
assert!(res.is_ok());
let (record, _high_watermark) = res.unwrap();
assert!(!record.is_empty());
} else {
let err = res.unwrap_err();
// The error is in a private module so we check it through `to_string()`.
assert!(err.to_string().contains("OffsetOutOfRange"));
}
}
async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
@@ -380,14 +578,34 @@ mod tests {
.unwrap();
}
async fn check_flush_request(
rx: &mut Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>,
region_ids: &[RegionId],
) {
let resp = rx.recv().await.unwrap().unwrap();
let msg = resp.mailbox_message.unwrap();
let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let mut flush_requested_region_ids = match flush_instruction {
Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids,
_ => unreachable!(),
};
let sorted_region_ids = region_ids
.iter()
.cloned()
.sorted_by_key(|a| a.as_u64())
.collect::<Vec<_>>();
flush_requested_region_ids.sort_by_key(|a| a.as_u64());
assert_eq!(flush_requested_region_ids, sorted_region_ids);
}
#[tokio::test]
async fn test_procedure_execution() {
run_test_with_kafka_wal(|broker_endpoints| {
Box::pin(async {
common_telemetry::init_default_ut_logging();
let topic_name = "greptime_test_topic".to_string();
let env = TestEnv::new();
let (mut procedure, min_last_entry_id, _) =
let mut env = TestEnv::new();
let (mut procedure, min_flushed_entry_id, regions_to_flush) =
mock_test_env(topic_name.clone(), broker_endpoints, &env).await;
// Step 1: Test `on_prepare`.
@@ -399,38 +617,79 @@ mod tests {
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id);
assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
assert_eq!(procedure.data.min_flushed_entry_id, min_flushed_entry_id);
assert_eq!(
procedure.data.regions_to_flush.len(),
regions_to_flush.len()
);
for region_id in &regions_to_flush {
assert!(procedure.data.regions_to_flush.contains(region_id));
}
// Step 2: Test `on_prune`.
// Step 2: Test `on_sending_flush_request`.
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
env.mailbox
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let status = procedure.on_sending_flush_request().await.unwrap();
check_flush_request(&mut rx, &regions_to_flush).await;
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
// Step 3: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after `min_flushed_entry_id` still exist.
assert!(
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64,
)
.await
);
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64 + 1,
true,
)
.await;
// Check if the entry s before `min_flushed_entry_id` are deleted.
assert!(
procedure.data.min_flushed_entry_id == 0
|| !check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64 - 1,
)
.await
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.min_flushed_entry_id as i64,
false,
)
.await;
let min_entry_id = env
.table_metadata_manager()
.topic_name_manager()
.get(&topic_name)
.await
.unwrap()
.unwrap();
assert_eq!(
min_entry_id.pruned_entry_id,
procedure.data.min_flushed_entry_id
);
// `check_heartbeat_collected_region_ids` fails.
// Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
// Should log a warning and return `Status::Done`.
procedure.context.leader_region_registry.reset();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Step 5: Test `on_prepare`, don't flush regions.
procedure.data.trigger_flush_threshold = None;
procedure.on_prepare().await.unwrap();
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(
min_entry_id.pruned_entry_id,
procedure.data.min_flushed_entry_id
);
// Clean up the topic.
delete_topic(procedure.context.client, &topic_name).await;
})

View File

@@ -136,6 +136,8 @@ pub trait Mailbox: Send + Sync {
timeout: Duration,
) -> Result<MailboxReceiver>;
async fn send_oneway(&self, ch: &Channel, msg: MailboxMessage) -> Result<()>;
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;