From 6e6e335a816a496301009aa3597921e13669cebe Mon Sep 17 00:00:00 2001 From: Yuhan Wang Date: Mon, 7 Apr 2025 22:05:18 +0800 Subject: [PATCH] feat(remote-wal): send flush request when pruning remote wal (#5825) * feat: update minimum entry id in kvbackend * fix: persist before delete * chore: apply comments * feat: add flush region in wal prune procedure * fix: cherry-pick error * chore: fmt * chore: drop rx to avoid block by response * chore: update comments * chore: apply review comments * test: fix unit test * feat: add option not to flush region during wal prune * test: fix unit test * fix: delete at minimum replay entry id + 1 * fix: cas * chore: add comments * chore: apply review comments * chore: apply review comments * chore: fix error msg * chore: apply review comments * fix: idempotent cas * refactor: use a one-way sender * chore: better err msg * chore: fix unit test * chore: apply review comments * chore: apply review comments * chore: replace send oneway --- src/common/meta/src/instruction.rs | 8 + src/common/meta/src/key.rs | 12 +- src/common/meta/src/key/topic_name.rs | 135 ++++-- src/datanode/src/heartbeat/handler.rs | 12 +- .../src/heartbeat/handler/close_region.rs | 14 +- .../src/heartbeat/handler/downgrade_region.rs | 83 ++-- .../src/heartbeat/handler/flush_region.rs | 104 +++++ .../src/heartbeat/handler/open_region.rs | 6 +- .../src/heartbeat/handler/upgrade_region.rs | 62 +-- src/meta-srv/src/error.rs | 23 +- src/meta-srv/src/handler.rs | 14 +- src/meta-srv/src/procedure/test_util.rs | 26 +- src/meta-srv/src/procedure/wal_prune.rs | 383 +++++++++++++++--- src/meta-srv/src/service/mailbox.rs | 2 + 14 files changed, 702 insertions(+), 182 deletions(-) create mode 100644 src/datanode/src/heartbeat/handler/flush_region.rs diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 8cfc06e882..afdc14dff0 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -192,6 +192,12 @@ pub struct DropFlow { pub flownode_ids: Vec, } +/// Flushes a batch of regions. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FlushRegions { + pub region_ids: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens a region. @@ -208,6 +214,8 @@ pub enum Instruction { DowngradeRegion(DowngradeRegion), /// Invalidates batch cache. InvalidateCaches(Vec), + /// Flushes regions. + FlushRegion(FlushRegions), } /// The reply of [UpgradeRegion]. diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d89236b564..72a93a2d94 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -57,7 +57,10 @@ //! - This key is mainly used in constructing the view in Datanode and Frontend. //! //! 12. Kafka topic key: `__topic_name/kafka/{topic_name}` -//! - The key is used to mark existing topics in kafka for WAL. +//! - The key is used to track existing topics in Kafka. +//! - The value is a [TopicNameValue](crate::key::topic_name::TopicNameValue) struct; it contains the `pruned_entry_id` which represents +//! the highest entry id that has been pruned from the remote WAL. +//! - When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimum available entry id). //! //! 13. Topic name to region map key `__topic_region/{topic_name}/{region_id}` //! - Mapping {topic_name} to {region_id} @@ -137,6 +140,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_name::TableName; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; +use topic_name::TopicNameManager; use topic_region::{TopicRegionKey, TopicRegionManager}; use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue}; @@ -309,6 +313,7 @@ pub struct TableMetadataManager { schema_manager: SchemaManager, table_route_manager: TableRouteManager, tombstone_manager: TombstoneManager, + topic_name_manager: TopicNameManager, topic_region_manager: TopicRegionManager, kv_backend: KvBackendRef, } @@ -460,6 +465,7 @@ impl TableMetadataManager { schema_manager: SchemaManager::new(kv_backend.clone()), table_route_manager: TableRouteManager::new(kv_backend.clone()), tombstone_manager: TombstoneManager::new(kv_backend.clone()), + topic_name_manager: TopicNameManager::new(kv_backend.clone()), topic_region_manager: TopicRegionManager::new(kv_backend.clone()), kv_backend, } @@ -513,6 +519,10 @@ impl TableMetadataManager { &self.table_route_manager } + pub fn topic_name_manager(&self) -> &TopicNameManager { + &self.topic_name_manager + } + pub fn topic_region_manager(&self) -> &TopicRegionManager { &self.topic_region_manager } diff --git a/src/common/meta/src/key/topic_name.rs b/src/common/meta/src/key/topic_name.rs index dc7ca17110..4f45be7d8f 100644 --- a/src/common/meta/src/key/topic_name.rs +++ b/src/common/meta/src/key/topic_name.rs @@ -15,11 +15,14 @@ use std::fmt::{self, Display}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result}; +use crate::ensure_values; +use crate::error::{self, DecodeJsonSnafu, Error, InvalidMetadataSnafu, Result, UnexpectedSnafu}; +use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ - MetadataKey, KAFKA_TOPIC_KEY_PATTERN, KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX, + DeserializedValueWithBytes, MetadataKey, MetadataValue, KAFKA_TOPIC_KEY_PATTERN, + KAFKA_TOPIC_KEY_PREFIX, LEGACY_TOPIC_KEY_PREFIX, }; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -31,8 +34,32 @@ pub struct TopicNameKey<'a> { pub topic: &'a str, } -#[derive(Debug, Serialize, Deserialize)] -pub struct TopicNameValue; +/// The value associated with a topic name key. +/// +/// The `pruned_entry_id` is the highest entry id that has been pruned from the remote WAL. +/// When a region uses this topic, it should start replaying entries from `pruned_entry_id + 1` (minimal available entry id). +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct TopicNameValue { + pub pruned_entry_id: u64, +} + +impl TopicNameValue { + pub fn new(pruned_entry_id: u64) -> Self { + Self { pruned_entry_id } + } +} + +impl MetadataValue for TopicNameValue { + fn try_from_raw_value(raw_value: &[u8]) -> Result { + let value = serde_json::from_slice::(raw_value).context(DecodeJsonSnafu)?; + Ok(value) + } + + fn try_as_raw_value(&self) -> Result> { + let raw_value = serde_json::to_vec(self).context(DecodeJsonSnafu)?; + Ok(raw_value) + } +} impl<'a> TopicNameKey<'a> { pub fn new(topic: &'a str) -> Self { @@ -114,13 +141,16 @@ impl TopicNameManager { { let topics = serde_json::from_slice::>(&kv.value).context(DecodeJsonSnafu)?; - let mut reqs = topics - .iter() - .map(|topic| { - let key = TopicNameKey::new(topic); - TxnOp::Put(key.to_bytes(), vec![]) - }) - .collect::>(); + let mut reqs = Vec::with_capacity(topics.len() + 1); + for topic in topics { + let topic_name_key = TopicNameKey::new(&topic); + let topic_name_value = TopicNameValue::new(0); + let put_req = TxnOp::Put( + topic_name_key.to_bytes(), + topic_name_value.try_as_raw_value()?, + ); + reqs.push(put_req); + } let delete_req = TxnOp::Delete(LEGACY_TOPIC_KEY_PREFIX.as_bytes().to_vec()); reqs.push(delete_req); let txn = Txn::new().and_then(reqs); @@ -129,7 +159,7 @@ impl TopicNameManager { Ok(()) } - /// Range query for topics. + /// Range query for topics. Only the keys are returned. /// Caution: this method returns keys as String instead of values of range query since the topics are stored in keys. pub async fn range(&self) -> Result> { let prefix = TopicNameKey::range_start_key(); @@ -142,25 +172,72 @@ impl TopicNameManager { .collect::>>() } - /// Put topics into kvbackend. + /// Put topics into kvbackend. The value is set to 0 by default. pub async fn batch_put(&self, topic_name_keys: Vec>) -> Result<()> { + let mut kvs = Vec::with_capacity(topic_name_keys.len()); + let topic_name_value = TopicNameValue::new(0); + for topic_name_key in &topic_name_keys { + let kv = KeyValue { + key: topic_name_key.to_bytes(), + value: topic_name_value.clone().try_as_raw_value()?, + }; + kvs.push(kv); + } let req = BatchPutRequest { - kvs: topic_name_keys - .iter() - .map(|key| KeyValue { - key: key.to_bytes(), - value: vec![], - }) - .collect(), + kvs, prev_kv: false, }; self.kv_backend.batch_put(req).await?; Ok(()) } + + /// Get value for a specific topic. + pub async fn get( + &self, + topic: &str, + ) -> Result>> { + let key = TopicNameKey::new(topic); + let raw_key = key.to_bytes(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) + .transpose() + } + + /// Update the topic name key and value in the kv backend. + pub async fn update( + &self, + topic: &str, + pruned_entry_id: u64, + prev: Option>, + ) -> Result<()> { + let key = TopicNameKey::new(topic); + let raw_key = key.to_bytes(); + let value = TopicNameValue::new(pruned_entry_id); + let new_raw_value = value.try_as_raw_value()?; + let raw_value = prev.map(|v| v.get_raw_bytes()).unwrap_or_default(); + + let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value.clone()); + let mut r = self.kv_backend.txn(txn).await?; + + if !r.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut r.responses); + let raw_value = TxnOpGetResponseSet::filter(raw_key)(&mut set) + .context(UnexpectedSnafu { + err_msg: "Reads the empty topic name value in comparing operation while updating TopicNameValue", + })?; + + let op_name = "updating TopicNameValue"; + ensure_values!(raw_value, new_raw_value, op_name); + } + Ok(()) + } } #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::sync::Arc; use super::*; @@ -204,7 +281,19 @@ mod tests { let topics = manager.range().await.unwrap(); assert_eq!(topics, all_topics); - let topics = manager.range().await.unwrap(); - assert_eq!(topics, all_topics); + for topic in &topics { + let value = manager.get(topic).await.unwrap().unwrap(); + assert_eq!(value.pruned_entry_id, 0); + manager.update(topic, 1, Some(value.clone())).await.unwrap(); + let new_value = manager.get(topic).await.unwrap().unwrap(); + assert_eq!(new_value.pruned_entry_id, 1); + // Update twice, nothing changed + manager.update(topic, 1, Some(value.clone())).await.unwrap(); + let new_value = manager.get(topic).await.unwrap().unwrap(); + assert_eq!(new_value.pruned_entry_id, 1); + // Bad cas, emit error + let err = manager.update(topic, 3, Some(value)).await.unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + } } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 34b568550d..319caf3c57 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -26,6 +26,7 @@ use store_api::storage::RegionId; mod close_region; mod downgrade_region; +mod flush_region; mod open_region; mod upgrade_region; @@ -42,7 +43,7 @@ pub struct RegionHeartbeatResponseHandler { /// Handler of the instruction. pub type InstructionHandler = - Box BoxFuture<'static, InstructionReply> + Send>; + Box BoxFuture<'static, Option> + Send>; #[derive(Clone)] pub struct HandlerContext { @@ -94,6 +95,9 @@ impl RegionHeartbeatResponseHandler { handler_context.handle_upgrade_region_instruction(upgrade_region) })), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), + Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| { + handler_context.handle_flush_region_instruction(flush_regions) + })), } } } @@ -129,8 +133,10 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { }) .await; - if let Err(e) = mailbox.send((meta, reply)).await { - error!(e; "Failed to send reply to mailbox"); + if let Some(reply) = reply { + if let Err(e) = mailbox.send((meta, reply)).await { + error!(e; "Failed to send reply to mailbox"); + } } }); diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index 1925b2df6b..77cbbf2c9c 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -26,28 +26,28 @@ impl HandlerContext { pub(crate) fn handle_close_region_instruction( self, region_ident: RegionIdent, - ) -> BoxFuture<'static, InstructionReply> { + ) -> BoxFuture<'static, Option> { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); let request = RegionRequest::Close(RegionCloseRequest {}); let result = self.region_server.handle_request(region_id, request).await; match result { - Ok(_) => InstructionReply::CloseRegion(SimpleReply { + Ok(_) => Some(InstructionReply::CloseRegion(SimpleReply { result: true, error: None, - }), + })), Err(error::Error::RegionNotFound { .. }) => { warn!("Received a close region instruction from meta, but target region:{region_id} is not found."); - InstructionReply::CloseRegion(SimpleReply { + Some(InstructionReply::CloseRegion(SimpleReply { result: true, error: None, - }) + })) } - Err(err) => InstructionReply::CloseRegion(SimpleReply { + Err(err) => Some(InstructionReply::CloseRegion(SimpleReply { result: false, error: Some(format!("{err:?}")), - }), + })), } }) } diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index bc7df1a171..216a460921 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -24,31 +24,34 @@ use crate::heartbeat::handler::HandlerContext; use crate::heartbeat::task_tracker::WaitResult; impl HandlerContext { - async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> InstructionReply { + async fn downgrade_to_follower_gracefully( + &self, + region_id: RegionId, + ) -> Option { match self .region_server .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) .await { Ok(SetRegionRoleStateResponse::Success { last_entry_id }) => { - InstructionReply::DowngradeRegion(DowngradeRegionReply { + Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id, exists: true, error: None, - }) + })) } Ok(SetRegionRoleStateResponse::NotFound) => { - InstructionReply::DowngradeRegion(DowngradeRegionReply { + Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: false, error: None, - }) + })) } - Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply { + Err(err) => Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: true, error: Some(format!("{err:?}")), - }), + })), } } @@ -59,15 +62,15 @@ impl HandlerContext { flush_timeout, reject_write, }: DowngradeRegion, - ) -> BoxFuture<'static, InstructionReply> { + ) -> BoxFuture<'static, Option> { Box::pin(async move { let Some(writable) = self.region_server.is_region_leader(region_id) else { warn!("Region: {region_id} is not found"); - return InstructionReply::DowngradeRegion(DowngradeRegionReply { + return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: false, error: None, - }); + })); }; let region_server_moved = self.region_server.clone(); @@ -99,19 +102,19 @@ impl HandlerContext { Ok(SetRegionRoleStateResponse::Success { .. }) => {} Ok(SetRegionRoleStateResponse::NotFound) => { warn!("Region: {region_id} is not found"); - return InstructionReply::DowngradeRegion(DowngradeRegionReply { + return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: false, error: None, - }); + })); } Err(err) => { warn!(err; "Failed to convert region to downgrading leader"); - return InstructionReply::DowngradeRegion(DowngradeRegionReply { + return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: true, error: Some(format!("{err:?}")), - }); + })); } } } @@ -144,18 +147,20 @@ impl HandlerContext { let result = self.catchup_tasks.wait(&mut watcher, flush_timeout).await; match result { - WaitResult::Timeout => InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - exists: true, - error: Some(format!("Flush region: {region_id} is timeout")), - }), + WaitResult::Timeout => { + Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id: None, + exists: true, + error: Some(format!("Flush region: {region_id} is timeout")), + })) + } WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await, WaitResult::Finish(Err(err)) => { - InstructionReply::DowngradeRegion(DowngradeRegionReply { + Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { last_entry_id: None, exists: true, error: Some(format!("{err:?}")), - }) + })) } } }) @@ -196,9 +201,9 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply { + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(!reply.exists); assert!(reply.error.is_none()); assert!(reply.last_entry_id.is_none()); @@ -238,9 +243,9 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply { + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply.error.is_none()); assert_eq!(reply.last_entry_id.unwrap(), 1024); @@ -272,9 +277,9 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply { + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply.error.unwrap().contains("timeout")); assert!(reply.last_entry_id.is_none()); @@ -310,8 +315,8 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); - if let InstructionReply::DowngradeRegion(reply) = reply { + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply.error.unwrap().contains("timeout")); assert!(reply.last_entry_id.is_none()); @@ -325,11 +330,11 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); // Must less than 300 ms. assert!(timer.elapsed().as_millis() < 300); - if let InstructionReply::DowngradeRegion(reply) = reply { + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply.error.is_none()); assert_eq!(reply.last_entry_id.unwrap(), 1024); @@ -371,8 +376,8 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); - if let InstructionReply::DowngradeRegion(reply) = reply { + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply.error.unwrap().contains("timeout")); assert!(reply.last_entry_id.is_none()); @@ -386,11 +391,11 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); // Must less than 300 ms. assert!(timer.elapsed().as_millis() < 300); - if let InstructionReply::DowngradeRegion(reply) = reply { + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply.error.unwrap().contains("flush failed")); assert!(reply.last_entry_id.is_none()); @@ -417,8 +422,8 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); - if let InstructionReply::DowngradeRegion(reply) = reply { + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(!reply.exists); assert!(reply.error.is_none()); assert!(reply.last_entry_id.is_none()); @@ -449,8 +454,8 @@ mod tests { reject_write: false, }) .await; - assert_matches!(reply, InstructionReply::DowngradeRegion(_)); - if let InstructionReply::DowngradeRegion(reply) = reply { + assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); + if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { assert!(reply.exists); assert!(reply .error diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs new file mode 100644 index 0000000000..04feca27a2 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{FlushRegions, InstructionReply}; +use common_telemetry::warn; +use futures_util::future::BoxFuture; +use store_api::region_request::{RegionFlushRequest, RegionRequest}; + +use crate::error; +use crate::heartbeat::handler::HandlerContext; + +impl HandlerContext { + pub(crate) fn handle_flush_region_instruction( + self, + flush_regions: FlushRegions, + ) -> BoxFuture<'static, Option> { + Box::pin(async move { + for region_id in flush_regions.region_ids { + let request = RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }); + let result = self.region_server.handle_request(region_id, request).await; + + match result { + Ok(_) => {} + Err(error::Error::RegionNotFound { .. }) => { + warn!("Received a flush region instruction from meta, but target region: {region_id} is not found."); + } + Err(err) => { + warn!( + "Failed to flush region: {region_id}, error: {err}", + region_id = region_id, + err = err, + ); + } + } + } + None + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, RwLock}; + + use common_meta::instruction::FlushRegions; + use mito2::engine::MITO_ENGINE_NAME; + use store_api::storage::RegionId; + + use super::*; + use crate::tests::{mock_region_server, MockRegionEngine}; + + #[tokio::test] + async fn test_handle_flush_region_instruction() { + let flushed_region_ids: Arc>> = Arc::new(RwLock::new(Vec::new())); + + let mock_region_server = mock_region_server(); + let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::>(); + for region_id in ®ion_ids { + let flushed_region_ids_ref = flushed_region_ids.clone(); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| { + region_engine.handle_request_mock_fn = + Some(Box::new(move |region_id, _request| { + flushed_region_ids_ref.write().unwrap().push(region_id); + Ok(0) + })) + }); + mock_region_server.register_test_region(*region_id, mock_engine); + } + let handler_context = HandlerContext::new_for_test(mock_region_server); + + let reply = handler_context + .clone() + .handle_flush_region_instruction(FlushRegions { + region_ids: region_ids.clone(), + }) + .await; + assert!(reply.is_none()); + assert_eq!(*flushed_region_ids.read().unwrap(), region_ids); + + flushed_region_ids.write().unwrap().clear(); + let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::>(); + let reply = handler_context + .handle_flush_region_instruction(FlushRegions { + region_ids: not_found_region_ids.clone(), + }) + .await; + assert!(reply.is_none()); + assert!(flushed_region_ids.read().unwrap().is_empty()); + } +} diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 7efab62c6e..13a1735e30 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -30,7 +30,7 @@ impl HandlerContext { region_wal_options, skip_wal_replay, }: OpenRegion, - ) -> BoxFuture<'static, InstructionReply> { + ) -> BoxFuture<'static, Option> { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); @@ -43,10 +43,10 @@ impl HandlerContext { let result = self.region_server.handle_request(region_id, request).await; let success = result.is_ok(); let error = result.as_ref().map_err(|e| format!("{e:?}")).err(); - InstructionReply::OpenRegion(SimpleReply { + Some(InstructionReply::OpenRegion(SimpleReply { result: success, error, - }) + })) }) } } diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index 9acb3da9c3..a23ae71a3d 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -29,22 +29,22 @@ impl HandlerContext { replay_timeout, location_id, }: UpgradeRegion, - ) -> BoxFuture<'static, InstructionReply> { + ) -> BoxFuture<'static, Option> { Box::pin(async move { let Some(writable) = self.region_server.is_region_leader(region_id) else { - return InstructionReply::UpgradeRegion(UpgradeRegionReply { + return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: false, error: None, - }); + })); }; if writable { - return InstructionReply::UpgradeRegion(UpgradeRegionReply { + return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: true, exists: true, error: None, - }); + })); } let region_server_moved = self.region_server.clone(); @@ -79,11 +79,11 @@ impl HandlerContext { // Returns immediately let Some(replay_timeout) = replay_timeout else { - return InstructionReply::UpgradeRegion(UpgradeRegionReply { + return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: true, error: None, - }); + })); }; // We don't care that it returns a newly registered or running task. @@ -91,22 +91,24 @@ impl HandlerContext { let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await; match result { - WaitResult::Timeout => InstructionReply::UpgradeRegion(UpgradeRegionReply { + WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: true, error: None, - }), - WaitResult::Finish(Ok(_)) => InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: true, - exists: true, - error: None, - }), + })), + WaitResult::Finish(Ok(_)) => { + Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready: true, + exists: true, + error: None, + })) + } WaitResult::Finish(Err(err)) => { - InstructionReply::UpgradeRegion(UpgradeRegionReply { + Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: true, error: Some(format!("{err:?}")), - }) + })) } } }) @@ -149,9 +151,9 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(!reply.exists); assert!(reply.error.is_none()); } @@ -187,9 +189,9 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -226,9 +228,9 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -268,9 +270,9 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -286,11 +288,11 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); // Must less than 300 ms. assert!(timer.elapsed().as_millis() < 300); - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -328,10 +330,10 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); // It didn't wait for handle returns; it had no idea about the error. - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -346,9 +348,9 @@ mod tests { location_id: None, }) .await; - assert_matches!(reply, InstructionReply::UpgradeRegion(_)); + assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply { + if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_some()); diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 4e9fa6a26c..7c45fa408f 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -802,8 +802,13 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to delete record from Kafka"))] - DeleteRecord { + #[snafu(display( + "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}", + topic, + partition, + offset + ))] + DeleteRecords { #[snafu(implicit)] location: Location, #[snafu(source)] @@ -812,6 +817,15 @@ pub enum Error { partition: i32, offset: u64, }, + + #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))] + UpdateTopicNameValue { + topic: String, + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: common_meta::error::Error, + }, } impl Error { @@ -861,7 +875,7 @@ impl ErrorExt for Error { | Error::FlowStateHandler { .. } | Error::BuildWalOptionsAllocator { .. } | Error::BuildPartitionClient { .. } - | Error::DeleteRecord { .. } => StatusCode::Internal, + | Error::DeleteRecords { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, @@ -924,7 +938,8 @@ impl ErrorExt for Error { | Error::TableMetadataManager { source, .. } | Error::MaintenanceModeManager { source, .. } | Error::KvBackend { source, .. } - | Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(), + | Error::UnexpectedLogicalRouteTable { source, .. } + | Error::UpdateTopicNameValue { source, .. } => source.status_code(), Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { source.status_code() diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 3336ab2a0f..3cba10d7d6 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -444,7 +444,7 @@ impl Mailbox for HeartbeatMailbox { let (tx, rx) = oneshot::channel(); let _ = self.senders.insert(message_id, tx); let deadline = Instant::now() + timeout; - let _ = self.timeouts.insert(message_id, deadline); + self.timeouts.insert(message_id, deadline); self.timeout_notify.notify_one(); self.pushers.push(pusher_id, msg).await?; @@ -452,6 +452,18 @@ impl Mailbox for HeartbeatMailbox { Ok(MailboxReceiver::new(message_id, rx, *ch)) } + async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> { + let message_id = 0; // one-way message, same as `broadcast` + msg.id = message_id; + + let pusher_id = ch.pusher_id(); + debug!("Sending mailbox message {msg:?} to {pusher_id}"); + + self.pushers.push(pusher_id, msg).await?; + + Ok(()) + } + async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> { self.pushers.broadcast(ch.pusher_range(), msg).await } diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 1b3616a8be..61254b133b 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -21,6 +21,7 @@ use common_meta::instruction::{ }; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; +use common_meta::key::topic_name::TopicNameKey; use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; use common_meta::region_registry::{ @@ -177,7 +178,8 @@ pub async fn new_wal_prune_metadata( ) -> (EntryId, Vec) { let datanode_id = 1; let from_peer = Peer::empty(datanode_id); - let mut min_last_entry_id = 0; + let mut min_flushed_entry_id = u64::MAX; + let mut max_flushed_entry_id = 0; let mut region_entry_ids = HashMap::with_capacity(n_table as usize * n_region as usize); for table_id in 0..n_table { let region_ids = (0..n_region) @@ -208,36 +210,42 @@ pub async fn new_wal_prune_metadata( ) .await .unwrap(); + table_metadata_manager + .topic_name_manager() + .batch_put(vec![TopicNameKey::new(&topic)]) + .await + .unwrap(); let current_region_entry_ids = region_ids .iter() .map(|region_id| { let rand_n = rand::random::() as usize; - let current_last_entry_id = offsets[rand_n % offsets.len()] as u64; - min_last_entry_id = min_last_entry_id.min(current_last_entry_id); - (*region_id, current_last_entry_id) + let current_flushed_entry_id = offsets[rand_n % offsets.len()] as u64; + min_flushed_entry_id = min_flushed_entry_id.min(current_flushed_entry_id); + max_flushed_entry_id = max_flushed_entry_id.max(current_flushed_entry_id); + (*region_id, current_flushed_entry_id) }) .collect::>(); region_entry_ids.extend(current_region_entry_ids.clone()); - update_in_memory_region_last_entry_id(&leader_region_registry, current_region_entry_ids) + update_in_memory_region_flushed_entry_id(&leader_region_registry, current_region_entry_ids) .await .unwrap(); } let regions_to_flush = region_entry_ids .iter() - .filter_map(|(region_id, last_entry_id)| { - if last_entry_id - min_last_entry_id > threshold { + .filter_map(|(region_id, flushed_entry_id)| { + if max_flushed_entry_id - flushed_entry_id > threshold { Some(*region_id) } else { None } }) .collect::>(); - (min_last_entry_id, regions_to_flush) + (min_flushed_entry_id, regions_to_flush) } -pub async fn update_in_memory_region_last_entry_id( +pub async fn update_in_memory_region_flushed_entry_id( leader_region_registry: &LeaderRegionRegistryRef, region_entry_ids: HashMap, ) -> Result<()> { diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index 3b93552845..b7f145fffc 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; +use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; +use common_meta::instruction::{FlushRegions, Instruction}; use common_meta::key::TableMetadataManagerRef; use common_meta::lock_key::RemoteWalLock; +use common_meta::peer::Peer; use common_meta::region_registry::LeaderRegionRegistryRef; use common_procedure::error::ToJsonSnafu; use common_procedure::{ @@ -25,6 +29,7 @@ use common_procedure::{ Result as ProcedureResult, Status, StringKey, }; use common_telemetry::warn; +use itertools::{Itertools, MinMaxResult}; use log_store::kafka::DEFAULT_PARTITION; use rskafka::client::partition::UnknownTopicHandling; use rskafka::client::Client; @@ -33,17 +38,23 @@ use snafu::ResultExt; use store_api::logstore::EntryId; use store_api::storage::RegionId; -use crate::error::{self, BuildPartitionClientSnafu, DeleteRecordSnafu, TableMetadataManagerSnafu}; +use crate::error::{ + self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu, + UpdateTopicNameValueSnafu, +}; +use crate::service::mailbox::{Channel, MailboxRef}; use crate::Result; type KafkaClientRef = Arc; -const TIMEOUT: i32 = 1000; +/// No timeout for flush request. +const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(1); /// The state of WAL pruning. #[derive(Debug, Serialize, Deserialize)] pub enum WalPruneState { Prepare, + FlushRegion, Prune, } @@ -52,7 +63,12 @@ pub struct Context { client: KafkaClientRef, /// The table metadata manager. table_metadata_manager: TableMetadataManagerRef, + /// The leader region registry. leader_region_registry: LeaderRegionRegistryRef, + /// Server address of metasrv. + server_addr: String, + /// The mailbox to send messages. + mailbox: MailboxRef, } /// The data of WAL pruning. @@ -61,8 +77,10 @@ pub struct WalPruneData { /// The topic name to prune. pub topic: String, /// The minimum flush entry id for topic, which is used to prune the WAL. - /// If the topic has no region, the value is set to `None`. pub min_flushed_entry_id: EntryId, + pub regions_to_flush: Vec, + /// If `flushed_entry_id` + `trigger_flush_threshold` < `max_flushed_entry_id`, send a flush request to the region. + pub trigger_flush_threshold: Option, /// The state. pub state: WalPruneState, } @@ -76,11 +94,13 @@ pub struct WalPruneProcedure { impl WalPruneProcedure { const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune"; - pub fn new(topic: String, context: Context) -> Self { + pub fn new(topic: String, context: Context, trigger_flush_threshold: Option) -> Self { Self { data: WalPruneData { topic, min_flushed_entry_id: 0, + trigger_flush_threshold, + regions_to_flush: vec![], state: WalPruneState::Prepare, }, context, @@ -92,7 +112,64 @@ impl WalPruneProcedure { Ok(Self { data, context }) } - /// Calculate the last entry id to prune for each topic. + async fn build_peer_to_region_ids_map( + &self, + ctx: &Context, + region_ids: &[RegionId], + ) -> Result>> { + let table_ids = region_ids + .iter() + .map(|region_id| region_id.table_id()) + .collect::>() + .into_iter() + .collect::>(); + let table_ids_table_routes_map = ctx + .table_metadata_manager + .table_route_manager() + .batch_get_physical_table_routes(&table_ids) + .await + .context(TableMetadataManagerSnafu)?; + + let mut peer_region_ids_map = HashMap::new(); + for region_id in region_ids { + let table_id = region_id.table_id(); + let table_route = match table_ids_table_routes_map.get(&table_id) { + Some(route) => route, + None => return error::TableRouteNotFoundSnafu { table_id }.fail(), + }; + for region_route in &table_route.region_routes { + if region_route.region.id != *region_id { + continue; + } + if let Some(peer) = ®ion_route.leader_peer { + peer_region_ids_map + .entry(peer.clone()) + .or_insert_with(Vec::new) + .push(*region_id); + } + } + } + Ok(peer_region_ids_map) + } + + fn build_flush_region_instruction( + &self, + peer_region_ids_map: HashMap>, + ) -> Result> { + let peer_and_instructions = peer_region_ids_map + .into_iter() + .map(|(peer, region_ids)| { + let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids }); + (peer.clone(), flush_instruction) + }) + .collect(); + Ok(peer_and_instructions) + } + + /// Prepare the entry id to prune and regions to flush. + /// + /// Retry: + /// - Failed to retrieve any metadata. pub async fn on_prepare(&mut self) -> Result { let region_ids = self .context @@ -116,10 +193,6 @@ impl WalPruneProcedure { }) .collect(); - if region_ids.is_empty() { - // No regions to prune. - return Ok(Status::done()); - } // Check if the `flush_entry_ids_map` contains all region ids. let non_collected_region_ids = check_heartbeat_collected_region_ids(®ion_ids, &flush_entry_ids_map); @@ -131,15 +204,64 @@ impl WalPruneProcedure { return Ok(Status::done()); } - // Safety: `flush_entry_ids_map` are not empty. - self.data.min_flushed_entry_id = *(flush_entry_ids_map.values().min().unwrap()); + let min_max_result = flush_entry_ids_map.values().minmax(); + let max_flushed_entry_id = match min_max_result { + MinMaxResult::NoElements => { + return Ok(Status::done()); + } + MinMaxResult::OneElement(flushed_entry_id) => { + self.data.min_flushed_entry_id = *flushed_entry_id; + *flushed_entry_id + } + MinMaxResult::MinMax(min_flushed_entry_id, max_flushed_entry_id) => { + self.data.min_flushed_entry_id = *min_flushed_entry_id; + *max_flushed_entry_id + } + }; + if let Some(threshold) = self.data.trigger_flush_threshold { + for (region_id, flushed_entry_id) in flush_entry_ids_map { + if flushed_entry_id + threshold < max_flushed_entry_id { + self.data.regions_to_flush.push(region_id); + } + } + self.data.state = WalPruneState::FlushRegion; + } else { + self.data.state = WalPruneState::Prune; + } + Ok(Status::executing(true)) + } + + /// Send the flush request to regions with low flush entry id. + pub async fn on_sending_flush_request(&mut self) -> Result { + let peer_to_region_ids_map = self + .build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush) + .await?; + let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?; + for (peer, flush_instruction) in flush_instructions.into_iter() { + let msg = MailboxMessage::json_message( + &format!("Flush regions: {}", flush_instruction), + &format!("Metasrv@{}", self.context.server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &flush_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: flush_instruction.to_string(), + })?; + let ch = Channel::Datanode(peer.id); + self.context.mailbox.send_oneway(&ch, msg).await?; + } self.data.state = WalPruneState::Prune; Ok(Status::executing(true)) } - /// Prune the WAL. + /// Prune the WAL and persist the minimum flushed entry id. + /// + /// Retry: + /// - Failed to update the minimum flushed entry id in kvbackend. + /// - Failed to delete records. pub async fn on_prune(&mut self) -> Result { - // Safety: last_entry_ids are loaded in on_prepare. + // Safety: flushed_entry_ids are loaded in on_prepare. let partition_client = self .context .client @@ -154,20 +276,54 @@ impl WalPruneProcedure { partition: DEFAULT_PARTITION, })?; - partition_client - .delete_records(self.data.min_flushed_entry_id as i64, TIMEOUT) + // Should update the min flushed entry id in the kv backend before deleting records. + // Otherwise, when a datanode restarts, it will not be able to find the wal entries. + let prev = self + .context + .table_metadata_manager + .topic_name_manager() + .get(&self.data.topic) .await - .context(DeleteRecordSnafu { - topic: self.data.topic.clone(), - partition: DEFAULT_PARTITION, - offset: self.data.min_flushed_entry_id, + .context(TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get TopicNameValue, topic: {}", self.data.topic), + })?; + self.context + .table_metadata_manager + .topic_name_manager() + .update(&self.data.topic, self.data.min_flushed_entry_id, prev) + .await + .context(UpdateTopicNameValueSnafu { + topic: &self.data.topic, }) .map_err(BoxedError::new) .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: "Failed to delete records", + reason: format!( + "Failed to update pruned entry id for topic: {}", + self.data.topic + ), + })?; + partition_client + .delete_records( + (self.data.min_flushed_entry_id + 1) as i64, + DELETE_RECORDS_TIMEOUT.as_millis() as i32, + ) + .await + .context(DeleteRecordsSnafu { + topic: &self.data.topic, + partition: DEFAULT_PARTITION, + offset: (self.data.min_flushed_entry_id + 1), + }) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to delete records for topic: {}, partition: {}, offset: {}", + self.data.topic, + DEFAULT_PARTITION, + self.data.min_flushed_entry_id + 1 + ), })?; - - // TODO(CookiePie): Persist the minimum flushed entry id to the table metadata manager. Ok(Status::done()) } } @@ -187,6 +343,7 @@ impl Procedure for WalPruneProcedure { match state { WalPruneState::Prepare => self.on_prepare().await, + WalPruneState::FlushRegion => self.on_sending_flush_request().await, WalPruneState::Prune => self.on_prune().await, } .map_err(|e| { @@ -230,21 +387,27 @@ fn check_heartbeat_collected_region_ids( mod tests { use std::assert_matches::assert_matches; + use api::v1::meta::HeartbeatResponse; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::region_registry::LeaderRegionRegistry; + use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::build_kafka_topic_creator; use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; use common_wal::config::kafka::MetasrvKafkaConfig; use common_wal::test_util::run_test_with_kafka_wal; use rskafka::record::Record; + use tokio::sync::mpsc::Receiver; use super::*; - use crate::procedure::test_util::new_wal_prune_metadata; + use crate::handler::HeartbeatMailbox; + use crate::procedure::test_util::{new_wal_prune_metadata, MailboxContext}; struct TestEnv { table_metadata_manager: TableMetadataManagerRef, leader_region_registry: LeaderRegionRegistryRef, + mailbox: MailboxContext, + server_addr: String, } impl TestEnv { @@ -252,9 +415,16 @@ mod tests { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let leader_region_registry = Arc::new(LeaderRegionRegistry::new()); + let mailbox_sequence = + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); + + let mailbox_ctx = MailboxContext::new(mailbox_sequence); + Self { table_metadata_manager, leader_region_registry, + mailbox: mailbox_ctx, + server_addr: "localhost".to_string(), } } @@ -265,14 +435,21 @@ mod tests { fn leader_region_registry(&self) -> &LeaderRegionRegistryRef { &self.leader_region_registry } + + fn mailbox_context(&self) -> &MailboxContext { + &self.mailbox + } + + fn server_addr(&self) -> &str { + &self.server_addr + } } /// Mock a test env for testing. /// Including: - /// 1. Create a test env with a mailbox, a table metadata manager and a in-memory kv backend. - /// 2. Prepare some data in the table metadata manager and in-memory kv backend. - /// 3. Generate a `WalPruneProcedure` with the test env. - /// 4. Return the test env, the procedure, the minimum last entry id to prune and the regions to flush. + /// 1. Prepare some data in the table metadata manager and in-memory kv backend. + /// 2. Generate a `WalPruneProcedure` with the test env. + /// 3. Return the procedure, the minimum last entry id to prune and the regions to flush. async fn mock_test_env( topic: String, broker_endpoints: Vec, @@ -294,15 +471,26 @@ mod tests { let topic_creator = build_kafka_topic_creator(&config).await.unwrap(); let table_metadata_manager = env.table_metadata_manager().clone(); let leader_region_registry = env.leader_region_registry().clone(); - let offsets = mock_wal_entries(topic_creator.client().clone(), &topic, 10).await; + let mailbox = env.mailbox_context().mailbox().clone(); - let (min_last_entry_id, regions_to_flush) = new_wal_prune_metadata( + let n_region = 10; + let n_table = 5; + let threshold = 10; + // 5 entries per region. + let offsets = mock_wal_entries( + topic_creator.client().clone(), + &topic, + (n_region * n_table * 5) as usize, + ) + .await; + + let (min_flushed_entry_id, regions_to_flush) = new_wal_prune_metadata( table_metadata_manager.clone(), leader_region_registry.clone(), - 10, - 5, + n_region, + n_table, &offsets, - 10, + threshold, topic.clone(), ) .await; @@ -311,10 +499,12 @@ mod tests { client: topic_creator.client().clone(), table_metadata_manager, leader_region_registry, + mailbox, + server_addr: env.server_addr().to_string(), }; - let wal_prune_procedure = WalPruneProcedure::new(topic, context); - (wal_prune_procedure, min_last_entry_id, regions_to_flush) + let wal_prune_procedure = WalPruneProcedure::new(topic, context, Some(threshold)); + (wal_prune_procedure, min_flushed_entry_id, regions_to_flush) } fn record(i: usize) -> Record { @@ -360,16 +550,24 @@ mod tests { client: KafkaClientRef, topic_name: &str, entry_id: i64, - ) -> bool { + expect_success: bool, + ) { let partition_client = client .partition_client(topic_name, 0, UnknownTopicHandling::Retry) .await .unwrap(); - let (records, _high_watermark) = partition_client + let res = partition_client .fetch_records(entry_id, 0..10001, 5_000) - .await - .unwrap(); - !records.is_empty() + .await; + if expect_success { + assert!(res.is_ok()); + let (record, _high_watermark) = res.unwrap(); + assert!(!record.is_empty()); + } else { + let err = res.unwrap_err(); + // The error is in a private module so we check it through `to_string()`. + assert!(err.to_string().contains("OffsetOutOfRange")); + } } async fn delete_topic(client: KafkaClientRef, topic_name: &str) { @@ -380,14 +578,34 @@ mod tests { .unwrap(); } + async fn check_flush_request( + rx: &mut Receiver>, + region_ids: &[RegionId], + ) { + let resp = rx.recv().await.unwrap().unwrap(); + let msg = resp.mailbox_message.unwrap(); + let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); + let mut flush_requested_region_ids = match flush_instruction { + Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids, + _ => unreachable!(), + }; + let sorted_region_ids = region_ids + .iter() + .cloned() + .sorted_by_key(|a| a.as_u64()) + .collect::>(); + flush_requested_region_ids.sort_by_key(|a| a.as_u64()); + assert_eq!(flush_requested_region_ids, sorted_region_ids); + } + #[tokio::test] async fn test_procedure_execution() { run_test_with_kafka_wal(|broker_endpoints| { Box::pin(async { common_telemetry::init_default_ut_logging(); let topic_name = "greptime_test_topic".to_string(); - let env = TestEnv::new(); - let (mut procedure, min_last_entry_id, _) = + let mut env = TestEnv::new(); + let (mut procedure, min_flushed_entry_id, regions_to_flush) = mock_test_env(topic_name.clone(), broker_endpoints, &env).await; // Step 1: Test `on_prepare`. @@ -399,38 +617,79 @@ mod tests { clean_poisons: false } ); - assert_matches!(procedure.data.state, WalPruneState::Prune); - assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id); + assert_matches!(procedure.data.state, WalPruneState::FlushRegion); + assert_eq!(procedure.data.min_flushed_entry_id, min_flushed_entry_id); + assert_eq!( + procedure.data.regions_to_flush.len(), + regions_to_flush.len() + ); + for region_id in ®ions_to_flush { + assert!(procedure.data.regions_to_flush.contains(region_id)); + } - // Step 2: Test `on_prune`. + // Step 2: Test `on_sending_flush_request`. + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + env.mailbox + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + let status = procedure.on_sending_flush_request().await.unwrap(); + check_flush_request(&mut rx, ®ions_to_flush).await; + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); + assert_matches!(procedure.data.state, WalPruneState::Prune); + + // Step 3: Test `on_prune`. let status = procedure.on_prune().await.unwrap(); assert_matches!(status, Status::Done { output: None }); // Check if the entry ids after `min_flushed_entry_id` still exist. - assert!( - check_entry_id_existence( - procedure.context.client.clone(), - &topic_name, - procedure.data.min_flushed_entry_id as i64, - ) - .await - ); + check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.min_flushed_entry_id as i64 + 1, + true, + ) + .await; // Check if the entry s before `min_flushed_entry_id` are deleted. - assert!( - procedure.data.min_flushed_entry_id == 0 - || !check_entry_id_existence( - procedure.context.client.clone(), - &topic_name, - procedure.data.min_flushed_entry_id as i64 - 1, - ) - .await + check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.min_flushed_entry_id as i64, + false, + ) + .await; + + let min_entry_id = env + .table_metadata_manager() + .topic_name_manager() + .get(&topic_name) + .await + .unwrap() + .unwrap(); + assert_eq!( + min_entry_id.pruned_entry_id, + procedure.data.min_flushed_entry_id ); - // `check_heartbeat_collected_region_ids` fails. + // Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails. // Should log a warning and return `Status::Done`. procedure.context.leader_region_registry.reset(); let status = procedure.on_prepare().await.unwrap(); assert_matches!(status, Status::Done { output: None }); + // Step 5: Test `on_prepare`, don't flush regions. + procedure.data.trigger_flush_threshold = None; + procedure.on_prepare().await.unwrap(); + assert_matches!(procedure.data.state, WalPruneState::Prune); + assert_eq!( + min_entry_id.pruned_entry_id, + procedure.data.min_flushed_entry_id + ); + // Clean up the topic. delete_topic(procedure.context.client, &topic_name).await; }) diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index 3350de2f63..90c96b9381 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -136,6 +136,8 @@ pub trait Mailbox: Send + Sync { timeout: Duration, ) -> Result; + async fn send_oneway(&self, ch: &Channel, msg: MailboxMessage) -> Result<()>; + async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>; async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()>;