feat: add add_handler_after, add_handler_before, replace_handler (#4788)

* feat: add `add_handler_after`, `add_handler_before`, `replace_handler`

* chore: apply suggestions from CR

* test: add more tests

* feat: use `Vec` instead of `LinkedList`

* Update src/meta-srv/src/lib.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Weny Xu
2024-09-30 16:53:59 +08:00
committed by GitHub
parent 6e776d5f98
commit c1e8084af6
3 changed files with 360 additions and 65 deletions

View File

@@ -733,6 +733,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Handler not found: {}", name))]
HandlerNotFound {
name: String,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -803,7 +810,8 @@ impl ErrorExt for Error {
| Error::InitExportMetricsTask { .. }
| Error::ProcedureNotFound { .. }
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. } => StatusCode::InvalidArguments,
| Error::TomlFormat { .. }
| Error::HandlerNotFound { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }

View File

@@ -232,17 +232,6 @@ pub struct HeartbeatHandlerGroup {
}
impl HeartbeatHandlerGroup {
pub(crate) fn new(pushers: Pushers) -> Self {
Self {
handlers: vec![],
pushers,
}
}
fn add_handler(&mut self, handler: impl HeartbeatHandler + 'static) {
self.handlers.push(NameCachedHandler::new(handler));
}
/// Registers the heartbeat response [`Pusher`] with the given key to the group.
pub async fn register_pusher(&self, key: impl AsRef<str>, pusher: Pusher) {
let key = key.as_ref();
@@ -458,25 +447,34 @@ pub struct HeartbeatHandlerGroupBuilder {
region_failure_handler: Option<RegionFailureHandler>,
/// The handler to handle region lease.
region_lease_handler: RegionLeaseHandler,
region_lease_handler: Option<RegionLeaseHandler>,
/// The plugins.
plugins: Option<Plugins>,
/// The heartbeat response pushers.
pushers: Pushers,
/// The group of heartbeat handlers.
handlers: Vec<NameCachedHandler>,
}
impl HeartbeatHandlerGroupBuilder {
pub fn new(pushers: Pushers, region_lease_handler: RegionLeaseHandler) -> Self {
pub fn new(pushers: Pushers) -> Self {
Self {
region_failure_handler: None,
region_lease_handler,
region_lease_handler: None,
plugins: None,
pushers,
handlers: vec![],
}
}
pub fn with_region_lease_handler(mut self, handler: Option<RegionLeaseHandler>) -> Self {
self.region_lease_handler = handler;
self
}
/// Sets the [`RegionFailureHandler`].
pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
self.region_failure_handler = handler;
@@ -489,10 +487,10 @@ impl HeartbeatHandlerGroupBuilder {
self
}
/// Builds the group of heartbeat handlers.
pub fn build(self) -> HeartbeatHandlerGroup {
/// Adds the default handlers.
pub fn add_default_handlers(mut self) -> Self {
// Extract the `PublishHeartbeatHandler` from the plugins.
let publish_heartbeat_handler = if let Some(plugins) = self.plugins {
let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() {
plugins
.get::<PublisherRef>()
.map(|publish| PublishHeartbeatHandler::new(publish.clone()))
@@ -500,39 +498,94 @@ impl HeartbeatHandlerGroupBuilder {
None
};
// TODO(weny): Considers classifying handlers
// to make it easier for upper layers to customize handler groups.
let mut group = HeartbeatHandlerGroup::new(self.pushers);
group.add_handler(ResponseHeaderHandler);
self.add_handler_last(ResponseHeaderHandler);
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(DatanodeKeepLeaseHandler);
group.add_handler(FlownodeKeepLeaseHandler);
group.add_handler(CheckLeaderHandler);
group.add_handler(OnLeaderStartHandler);
group.add_handler(ExtractStatHandler);
group.add_handler(CollectDatanodeClusterInfoHandler);
group.add_handler(CollectFrontendClusterInfoHandler);
group.add_handler(CollectFlownodeClusterInfoHandler);
group.add_handler(MailboxHandler);
group.add_handler(self.region_lease_handler);
group.add_handler(FilterInactiveRegionStatsHandler);
if let Some(region_failure_handler) = self.region_failure_handler {
group.add_handler(region_failure_handler);
self.add_handler_last(DatanodeKeepLeaseHandler);
self.add_handler_last(FlownodeKeepLeaseHandler);
self.add_handler_last(CheckLeaderHandler);
self.add_handler_last(OnLeaderStartHandler);
self.add_handler_last(ExtractStatHandler);
self.add_handler_last(CollectDatanodeClusterInfoHandler);
self.add_handler_last(CollectFrontendClusterInfoHandler);
self.add_handler_last(CollectFlownodeClusterInfoHandler);
self.add_handler_last(MailboxHandler);
if let Some(region_lease_handler) = self.region_lease_handler.take() {
self.add_handler_last(region_lease_handler);
}
self.add_handler_last(FilterInactiveRegionStatsHandler);
if let Some(region_failure_handler) = self.region_failure_handler.take() {
self.add_handler_last(region_failure_handler);
}
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
group.add_handler(publish_heartbeat_handler);
self.add_handler_last(publish_heartbeat_handler);
}
group.add_handler(CollectStatsHandler::default());
self.add_handler_last(CollectStatsHandler::default());
group
self
}
/// Builds the group of heartbeat handlers.
pub fn build(self) -> HeartbeatHandlerGroup {
HeartbeatHandlerGroup {
handlers: self.handlers.into_iter().collect(),
pushers: self.pushers,
}
}
/// Adds the handler after the specified handler.
pub fn add_handler_after(
&mut self,
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers
.insert(pos + 1, NameCachedHandler::new(handler));
return Ok(());
}
error::HandlerNotFoundSnafu { name: target }.fail()
}
/// Adds the handler before the specified handler.
pub fn add_handler_before(
&mut self,
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers.insert(pos, NameCachedHandler::new(handler));
return Ok(());
}
error::HandlerNotFoundSnafu { name: target }.fail()
}
/// Replaces the handler with the specified name.
pub fn replace_handler(
&mut self,
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers[pos] = NameCachedHandler::new(handler);
return Ok(());
}
error::HandlerNotFoundSnafu { name: target }.fail()
}
fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
self.handlers.push(NameCachedHandler::new(handler));
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;
@@ -541,17 +594,9 @@ mod tests {
use common_meta::sequence::SequenceBuilder;
use tokio::sync::mpsc;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use super::{HeartbeatHandlerGroupBuilder, Pushers};
use crate::error;
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::extract_stat_handler::ExtractStatHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
@@ -621,24 +666,13 @@ mod tests {
(mailbox, receiver)
}
#[tokio::test]
async fn test_handler_name() {
let mut group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler);
group.add_handler(DatanodeKeepLeaseHandler);
group.add_handler(FlownodeKeepLeaseHandler);
group.add_handler(CheckLeaderHandler);
group.add_handler(OnLeaderStartHandler);
group.add_handler(ExtractStatHandler);
group.add_handler(CollectDatanodeClusterInfoHandler);
group.add_handler(CollectFrontendClusterInfoHandler);
group.add_handler(CollectFlownodeClusterInfoHandler);
group.add_handler(MailboxHandler);
group.add_handler(FilterInactiveRegionStatsHandler);
group.add_handler(CollectStatsHandler::default());
#[test]
fn test_handler_group_builder() {
let group = HeartbeatHandlerGroupBuilder::new(Pushers::default())
.add_default_handlers()
.build();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
let names = [
@@ -660,4 +694,255 @@ mod tests {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_add_before() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.add_handler_before(
"FilterInactiveRegionStatsHandler",
CollectStatsHandler::default(),
)
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_add_before_first() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default())
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
let names = [
"CollectStatsHandler",
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_add_after() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.add_handler_after("MailboxHandler", CollectStatsHandler::default())
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_add_after_last() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.add_handler_after("CollectStatsHandler", ResponseHeaderHandler)
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"ResponseHeaderHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_replace() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.replace_handler("MailboxHandler", CollectStatsHandler::default())
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_replace_last() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.replace_handler("CollectStatsHandler", ResponseHeaderHandler)
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"ResponseHeaderHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_replace_first() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
builder
.replace_handler("ResponseHeaderHandler", CollectStatsHandler::default())
.unwrap();
let group = builder.build();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
let names = [
"CollectStatsHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
#[test]
fn test_handler_group_builder_handler_not_found() {
let mut builder =
HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers();
let err = builder
.add_handler_before("NotExists", CollectStatsHandler::default())
.unwrap_err();
assert_matches!(err, error::Error::HandlerNotFound { .. });
let err = builder
.add_handler_after("NotExists", CollectStatsHandler::default())
.unwrap_err();
assert_matches!(err, error::Error::HandlerNotFound { .. });
let err = builder
.replace_handler("NotExists", CollectStatsHandler::default())
.unwrap_err();
assert_matches!(err, error::Error::HandlerNotFound { .. });
}
}

View File

@@ -358,9 +358,11 @@ impl MetasrvBuilder {
memory_region_keeper.clone(),
);
HeartbeatHandlerGroupBuilder::new(pushers, region_lease_handler)
HeartbeatHandlerGroupBuilder::new(pushers)
.with_plugins(plugins.clone())
.with_region_failure_handler(region_failover_handler)
.with_region_lease_handler(Some(region_lease_handler))
.add_default_handlers()
.build()
}
};