refactor: move the common part of the heartbeat response handler to common (#1627)

* refactor: move heartbeat response handler to common

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-05-24 16:55:06 +09:00
committed by GitHub
parent fa4a497d75
commit 74a6517bd0
17 changed files with 236 additions and 146 deletions

5
Cargo.lock generated
View File

@@ -1757,11 +1757,16 @@ dependencies = [
"chrono",
"common-catalog",
"common-error",
"common-runtime",
"common-telemetry",
"common-time",
"datatypes",
"serde",
"serde_json",
"snafu",
"store-api",
"table",
"tokio",
]
[[package]]

View File

@@ -8,10 +8,15 @@ license.workspace = true
api = { path = "../../api" }
common-catalog = { path = "../catalog" }
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
serde.workspace = true
snafu.workspace = true
serde_json.workspace = true
store-api = { path = "../../store-api" }
table = { path = "../../table" }
tokio.workspace = true
[dev-dependencies]
chrono.workspace = true

View File

@@ -13,11 +13,30 @@
// limitations under the License.
use common_error::prelude::*;
use serde_json::error::Error as JsonError;
use snafu::Location;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to encode object into json, source: {}", source))]
EncodeJson {
location: Location,
source: JsonError,
},
#[snafu(display("Failed to decode object from json, source: {}", source))]
DecodeJson {
location: Location,
source: JsonError,
},
#[snafu(display("Payload not exist"))]
PayloadNotExist { location: Location },
#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage { err_msg: String, location: Location },
#[snafu(display("Failed to serde json, source: {}", source))]
SerdeJson {
source: serde_json::error::Error,
@@ -39,9 +58,16 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Error::IllegalServerState { .. } => StatusCode::Internal,
Error::SerdeJson { .. } | Error::RouteInfoCorrupted { .. } => StatusCode::Unexpected,
IllegalServerState { .. } => StatusCode::Internal,
SerdeJson { .. } | RouteInfoCorrupted { .. } => StatusCode::Unexpected,
SendMessage { .. } => StatusCode::Internal,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {
StatusCode::Unexpected
}
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod handler;
pub mod mailbox;
pub mod utils;

View File

@@ -0,0 +1,98 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::HeartbeatResponse;
use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod parse_mailbox_message;
#[cfg(test)]
mod tests;
pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
pub struct HeartbeatResponseHandlerContext {
pub mailbox: MailboxRef,
pub response: HeartbeatResponse,
pub incoming_message: Option<IncomingMessage>,
}
/// HandleControl
///
/// Controls process of handling heartbeat response.
#[derive(PartialEq)]
pub enum HandleControl {
Continue,
Done,
}
impl HeartbeatResponseHandlerContext {
pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
Self {
mailbox,
response,
incoming_message: None,
}
}
}
/// HeartbeatResponseHandler
///
/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`].
///
/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`].
pub trait HeartbeatResponseHandler: Send + Sync {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
}
pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
}
pub struct HandlerGroupExecutor {
handlers: Vec<HeartbeatResponseHandlerRef>,
}
impl HandlerGroupExecutor {
pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
Self { handlers }
}
}
impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
for handler in &self.handlers {
if !handler.is_acceptable(&ctx) {
continue;
}
match handler.handle(&mut ctx) {
Ok(HandleControl::Done) => break,
Ok(HandleControl::Continue) => {}
Err(e) => {
error!(e;"Error while handling: {:?}", ctx.response);
break;
}
}
}
Ok(())
}
}

View File

@@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, SimpleReply};
use tokio::sync::mpsc;
use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use crate::instruction::{InstructionReply, SimpleReply};
#[tokio::test]
async fn test_heartbeat_mailbox() {

View File

@@ -14,10 +14,10 @@
use std::sync::Arc;
use common_meta::instruction::{Instruction, InstructionReply};
use tokio::sync::mpsc::Sender;
use crate::error::{self, Result};
use crate::instruction::{Instruction, InstructionReply};
pub type IncomingMessage = (MessageMeta, Instruction);
pub type OutgoingMessage = (MessageMeta, InstructionReply);

View File

@@ -14,12 +14,12 @@
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::Instruction;
use common_time::util::current_time_millis;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
use crate::instruction::Instruction;
pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
m.payload

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod error;
pub mod heartbeat;
pub mod instruction;
pub mod peer;
pub mod rpc;

View File

@@ -25,14 +25,6 @@ use table::error::Error as TableError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to check region in table: {}, source: {}", table_name, source))]
CheckRegion {
table_name: String,
#[snafu(backtrace)]
source: TableError,
region_number: RegionNumber,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
AccessCatalog {
#[snafu(backtrace)]
@@ -46,6 +38,13 @@ pub enum Error {
source: catalog::error::Error,
},
#[snafu(display("Failed to register table: {}, source: {}", table_name, source))]
RegisterTable {
table_name: String,
location: Location,
source: catalog::error::Error,
},
#[snafu(display("Failed to open table: {}, source: {}", table_name, source))]
OpenTable {
table_name: String,
@@ -53,11 +52,11 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Failed to register table: {}, source: {}", table_name, source))]
RegisterTable {
#[snafu(display("Failed to get table: {}, source: {}", table_name, source))]
GetTable {
table_name: String,
#[snafu(backtrace)]
source: catalog::error::Error,
location: Location,
source: TableError,
},
#[snafu(display(
@@ -73,8 +72,24 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage { err_msg: String, location: Location },
#[snafu(display(
"Failed to check region {} in table: {}, source: {}",
region_number,
table_name,
source
))]
CheckRegion {
table_name: String,
location: Location,
source: TableError,
region_number: RegionNumber,
},
#[snafu(display("Failed to handle heartbeat response, source: {}", source))]
HandleHeartbeatResponse {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to execute sql, source: {}", source))]
ExecuteSql {
@@ -128,13 +143,6 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Failed to get table: {}, source: {}", table_name, source))]
GetTable {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},
#[snafu(display("Failed to drop table {}, source: {}", table_name, source))]
DropTable {
table_name: String,
@@ -438,7 +446,7 @@ pub enum Error {
source: JsonError,
},
#[snafu(display("Failed to decode object into json, source: {}", source))]
#[snafu(display("Failed to decode object from json, source: {}", source))]
DecodeJson {
location: Location,
source: JsonError,
@@ -472,19 +480,14 @@ impl ErrorExt for Error {
| ExecuteStatement { source }
| ExecuteLogicalPlan { source } => source.status_code(),
OpenTable { source, .. } => source.status_code(),
RegisterTable { source, .. }
| DeregisterTable { source, .. }
| AccessCatalog { source, .. } => source.status_code(),
HandleHeartbeatResponse { source, .. } => source.status_code(),
DecodeLogicalPlan { source } => source.status_code(),
NewCatalog { source } | RegisterSchema { source } => source.status_code(),
FindTable { source, .. } => source.status_code(),
CreateTable { source, .. } | CheckRegion { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
DropTable { source, .. } => source.status_code(),
FlushTable { source, .. } => source.status_code(),
GetTable { source, .. } => source.status_code(),
CloseTable { source, .. } => source.status_code(),
Insert { source, .. } => source.status_code(),
Delete { source, .. } => source.status_code(),
@@ -524,6 +527,15 @@ impl ErrorExt for Error {
StatusCode::Unexpected
}
AccessCatalog { source, .. }
| DeregisterTable { source, .. }
| RegisterTable { source, .. } => source.status_code(),
CheckRegion { source, .. }
| OpenTable { source, .. }
| CloseTable { source, .. }
| GetTable { source, .. } => source.status_code(),
// TODO(yingwen): Further categorize http error.
StartServer { .. }
| ParseAddr { .. }
@@ -534,8 +546,7 @@ impl ErrorExt for Error {
| IncorrectInternalState { .. }
| ShutdownServer { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. }
| SendMessage { .. } => StatusCode::Internal,
| CloseTableEngine { .. } => StatusCode::Internal,
InitBackend { .. } => StatusCode::StorageUnavailable,

View File

@@ -18,23 +18,20 @@ use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeStat, Peer};
use catalog::{datanode_stat, CatalogManagerRef};
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{error, info, trace, warn};
use mailbox::{HeartbeatMailbox, MailboxRef};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::Instant;
use self::handler::{HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef};
use self::utils::outgoing_message_to_mailbox_message;
use crate::error::{MetaClientInitSnafu, Result};
use crate::error::{self, MetaClientInitSnafu, Result};
pub mod handler;
pub mod utils;
// TODO(weny): remove allow dead_code
#[allow(dead_code)]
pub mod mailbox;
pub(crate) mod handler;
pub struct HeartbeatTask {
node_id: u64,
@@ -114,7 +111,9 @@ impl HeartbeatTask {
handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Result<()> {
trace!("heartbeat response: {:?}", ctx.response);
handler_executor.handle(ctx)
handler_executor
.handle(ctx)
.context(error::HandleHeartbeatResponseSnafu)
}
/// Start heartbeat task, spawn background task.

View File

@@ -12,89 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::HeartbeatResponse;
use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod close_region;
pub mod open_region;
pub mod parse_mailbox_message;
#[cfg(test)]
mod tests;
pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
pub struct HeartbeatResponseHandlerContext {
pub mailbox: MailboxRef,
pub response: HeartbeatResponse,
pub incoming_message: Option<IncomingMessage>,
}
/// HandleControl
///
/// Controls process of handling heartbeat response.
#[derive(PartialEq)]
pub enum HandleControl {
Continue,
Done,
}
impl HeartbeatResponseHandlerContext {
pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
Self {
mailbox,
response,
incoming_message: None,
}
}
}
/// HeartbeatResponseHandler
///
/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`].
///
/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`].
pub trait HeartbeatResponseHandler: Send + Sync {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
}
pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
}
pub struct HandlerGroupExecutor {
handlers: Vec<HeartbeatResponseHandlerRef>,
}
impl HandlerGroupExecutor {
pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
Self { handlers }
}
}
impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
for handler in &self.handlers {
if !handler.is_acceptable(&ctx) {
continue;
}
match handler.handle(&mut ctx) {
Ok(HandleControl::Done) => break,
Ok(HandleControl::Continue) => {}
Err(e) => {
error!(e;"Error while handling: {:?}", ctx.response);
break;
}
}
}
Ok(())
}
}

View File

@@ -16,9 +16,12 @@ use std::sync::Arc;
use catalog::{CatalogManagerRef, DeregisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_telemetry::{error, warn};
use log::info;
use common_telemetry::{error, info, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
@@ -26,8 +29,6 @@ use table::engine::{CloseTableResult, EngineContext, TableReference};
use table::requests::CloseTableRequest;
use crate::error::{self, Result};
use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use crate::heartbeat::HeartbeatResponseHandlerContext;
#[derive(Clone)]
pub struct CloseRegionHandler {
@@ -43,7 +44,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
)
}
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else {
unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'");
};

View File

@@ -17,6 +17,10 @@ use std::sync::Arc;
use catalog::error::Error as CatalogError;
use catalog::{CatalogManagerRef, RegisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_telemetry::{error, warn};
use snafu::ResultExt;
@@ -26,8 +30,6 @@ use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use crate::error::{self, Result};
use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use crate::heartbeat::HeartbeatResponseHandlerContext;
#[derive(Clone)]
pub struct OpenRegionHandler {
@@ -43,7 +45,7 @@ impl HeartbeatResponseHandler for OpenRegionHandler {
)
}
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else {
unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'");
};

View File

@@ -24,6 +24,8 @@ use common_base::paths::{CLUSTER_DIR, WAL_DIR};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
@@ -60,8 +62,6 @@ use crate::error::{
};
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use crate::heartbeat::handler::HandlerGroupExecutor;
use crate::heartbeat::HeartbeatTask;
use crate::sql::{SqlHandler, SqlRequest};
use crate::store;

View File

@@ -20,6 +20,10 @@ use api::v1::meta::HeartbeatResponse;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use catalog::CatalogManagerRef;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_query::Output;
use datatypes::prelude::ConcreteDataType;
@@ -31,10 +35,6 @@ use tokio::sync::mpsc::{self, Receiver};
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
use crate::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use crate::instance::Instance;
pub(crate) mod test_util;
@@ -205,6 +205,15 @@ async fn parepare_handler_test(name: &str) -> HandlerTestGuard {
}
}
pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
MessageMeta {
id,
subject: subject.to_string(),
to: to.to_string(),
from: from.to_string(),
}
}
fn handle_instruction(
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
mailbox: Arc<HeartbeatMailbox>,
@@ -213,7 +222,7 @@ fn handle_instruction(
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((MessageMeta::new_test(1, "hi", "foo", "bar"), instruction));
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
executor.handle(ctx).unwrap();
}