refactor: use generic for heartbeat instruction handler (#7149)

* refactor: use generic

Signed-off-by: discord9 <discord9@163.com>

* w

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-10-27 17:09:48 +08:00
committed by GitHub
parent 785f9d7fd7
commit f2bc92b9e6
6 changed files with 148 additions and 99 deletions

View File

@@ -47,10 +47,11 @@ pub struct RegionHeartbeatResponseHandler {
#[async_trait::async_trait]
pub trait InstructionHandler: Send + Sync {
type Instruction;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
instruction: Self::Instruction,
) -> Option<InstructionReply>;
}
@@ -93,39 +94,101 @@ impl RegionHeartbeatResponseHandler {
self
}
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<dyn InstructionHandler>> {
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> {
match instruction {
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler)),
Instruction::OpenRegions(_) => Ok(Box::new(OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
})),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)),
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)),
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())),
Instruction::OpenRegions(_) => Ok(Box::new(
OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
}
}
}
#[allow(clippy::enum_variant_names)]
pub enum InstructionHandlers {
CloseRegions(CloseRegionsHandler),
OpenRegions(OpenRegionsHandler),
FlushRegions(FlushRegionsHandler),
DowngradeRegions(DowngradeRegionsHandler),
UpgradeRegions(UpgradeRegionsHandler),
}
macro_rules! impl_from_handler {
($($handler:ident => $variant:ident),*) => {
$(
impl From<$handler> for InstructionHandlers {
fn from(handler: $handler) -> Self {
InstructionHandlers::$variant(handler)
}
}
)*
};
}
impl_from_handler!(
CloseRegionsHandler => CloseRegions,
OpenRegionsHandler => OpenRegions,
FlushRegionsHandler => FlushRegions,
DowngradeRegionsHandler => DowngradeRegions,
UpgradeRegionsHandler => UpgradeRegions
);
macro_rules! dispatch_instr {
(
$( $instr_variant:ident => $handler_variant:ident ),* $(,)?
) => {
impl InstructionHandlers {
pub async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
) -> Option<InstructionReply> {
match (self, instruction) {
$(
(
InstructionHandlers::$handler_variant(handler),
Instruction::$instr_variant(instr),
) => handler.handle(ctx, instr).await,
)*
// Safety: must be used in pairs with `build_handler`.
_ => unreachable!(),
}
}
/// Check whether this instruction is acceptable by any handler.
pub fn is_acceptable(instruction: &Instruction) -> bool {
matches!(
instruction,
$(
Instruction::$instr_variant { .. }
)|*
)
}
}
};
}
dispatch_instr!(
CloseRegions => CloseRegions,
OpenRegions => OpenRegions,
FlushRegions => FlushRegions,
DowngradeRegions => DowngradeRegions,
UpgradeRegion => UpgradeRegions,
);
#[async_trait]
impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(ctx.incoming_message.as_ref(), |Some((
_,
Instruction::DowngradeRegions { .. },
))| Some((
_,
Instruction::UpgradeRegion { .. }
)) | Some((
_,
Instruction::FlushRegions { .. }
)) | Some((
_,
Instruction::OpenRegions { .. }
)) | Some((
_,
Instruction::CloseRegions { .. }
)))
if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
return InstructionHandlers::is_acceptable(instruction);
}
false
}
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {

View File

@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_telemetry::warn;
use futures::future::join_all;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
@@ -26,13 +27,13 @@ pub struct CloseRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for CloseRegionsHandler {
type Instruction = Vec<RegionIdent>;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
region_idents: Self::Instruction,
) -> Option<InstructionReply> {
// Safety: must be `Instruction::CloseRegions` instruction.
let region_idents = instruction.into_close_regions().unwrap();
let region_ids = region_idents
.into_iter()
.map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number))

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, InstructionReply,
};
use common_telemetry::tracing::info;
use common_telemetry::{error, warn};
@@ -156,13 +156,13 @@ impl DowngradeRegionsHandler {
#[async_trait::async_trait]
impl InstructionHandler for DowngradeRegionsHandler {
type Instruction = Vec<DowngradeRegion>;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
downgrade_regions: Self::Instruction,
) -> Option<InstructionReply> {
// Safety: must be `Instruction::DowngradeRegion` instruction.
let downgrade_regions = instruction.into_downgrade_regions().unwrap();
let futures = downgrade_regions
.into_iter()
.map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
@@ -263,10 +263,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
@@ -306,10 +306,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
@@ -341,10 +341,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: Some(flush_timeout),
}]),
}],
)
.await;
@@ -380,10 +380,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
@@ -396,10 +396,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
}]),
}],
)
.await;
// Must less than 300 ms.
@@ -443,10 +443,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout,
}]),
}],
)
.await;
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
@@ -458,10 +458,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: Some(Duration::from_millis(500)),
}]),
}],
)
.await;
// Must less than 300 ms.
@@ -487,10 +487,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: None,
}]),
}],
)
.await;
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
@@ -518,10 +518,10 @@ mod tests {
let reply = DowngradeRegionsHandler
.handle(
&handler_context,
Instruction::DowngradeRegions(vec![DowngradeRegion {
vec![DowngradeRegion {
region_id,
flush_timeout: None,
}]),
}],
)
.await;
let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];

View File

@@ -15,7 +15,7 @@
use std::time::Instant;
use common_meta::instruction::{
FlushErrorStrategy, FlushRegionReply, FlushStrategy, Instruction, InstructionReply,
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
};
use common_telemetry::{debug, warn};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
@@ -28,13 +28,14 @@ pub struct FlushRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for FlushRegionsHandler {
type Instruction = FlushRegions;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
flush_regions: FlushRegions,
) -> Option<InstructionReply> {
let start_time = Instant::now();
let flush_regions = instruction.into_flush_regions().unwrap();
let strategy = flush_regions.strategy;
let region_ids = flush_regions.region_ids;
let error_strategy = flush_regions.error_strategy;
@@ -205,10 +206,7 @@ mod tests {
// Async hint mode
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none()); // Hint mode returns no reply
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
@@ -218,10 +216,7 @@ mod tests {
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
@@ -247,10 +242,7 @@ mod tests {
let flush_instruction = FlushRegions::sync_single(region_id);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
let flush_reply = reply.unwrap().expect_flush_regions_reply();
assert!(flush_reply.overall_success);
@@ -287,10 +279,7 @@ mod tests {
let flush_instruction =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
let flush_reply = reply.unwrap().expect_flush_regions_reply();
assert!(!flush_reply.overall_success); // Should fail due to non-existent regions
@@ -321,10 +310,7 @@ mod tests {
let flush_instruction =
FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
let reply = FlushRegionsHandler
.handle(
&handler_context,
Instruction::FlushRegions(flush_instruction),
)
.handle(&handler_context, flush_instruction)
.await;
let flush_reply = reply.unwrap().expect_flush_regions_reply();
assert!(!flush_reply.overall_success); // Should fail due to one non-existent region

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
@@ -26,13 +26,12 @@ pub struct OpenRegionsHandler {
#[async_trait::async_trait]
impl InstructionHandler for OpenRegionsHandler {
type Instruction = Vec<OpenRegion>;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
open_regions: Self::Instruction,
) -> Option<InstructionReply> {
let open_regions = instruction.into_open_regions().unwrap();
let requests = open_regions
.into_iter()
.map(|open_region| {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::{info, warn};
use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
@@ -24,12 +24,12 @@ pub struct UpgradeRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for UpgradeRegionsHandler {
type Instruction = UpgradeRegion;
async fn handle(
&self,
ctx: &HandlerContext,
instruction: Instruction,
) -> Option<InstructionReply> {
let UpgradeRegion {
UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
@@ -37,8 +37,8 @@ impl InstructionHandler for UpgradeRegionsHandler {
location_id,
replay_entry_id,
metadata_replay_entry_id,
} = instruction.into_upgrade_regions().unwrap();
}: UpgradeRegion,
) -> Option<InstructionReply> {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
@@ -138,7 +138,7 @@ impl InstructionHandler for UpgradeRegionsHandler {
mod tests {
use std::time::Duration;
use common_meta::instruction::{Instruction, UpgradeRegion};
use common_meta::instruction::UpgradeRegion;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -164,11 +164,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -201,11 +201,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -239,11 +239,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -280,11 +280,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}),
},
)
.await;
@@ -298,11 +298,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout: Some(Duration::from_millis(500)),
..Default::default()
}),
},
)
.await;
// Must less than 300 ms.
@@ -339,10 +339,10 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
..Default::default()
}),
},
)
.await;
@@ -355,11 +355,11 @@ mod tests {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
Instruction::UpgradeRegion(UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout: Some(Duration::from_millis(200)),
..Default::default()
}),
},
)
.await;