From 74a6517bd0c464b44b18865f20f0b97efed9f0a1 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 24 May 2023 16:55:06 +0900 Subject: [PATCH] refactor: move the common part of the heartbeat response handler to common (#1627) * refactor: move heartbeat response handler to common * chore: apply suggestions from CR --- Cargo.lock | 5 + src/common/meta/Cargo.toml | 5 + src/common/meta/src/error.rs | 30 +++++- src/common/meta/src/heartbeat.rs | 17 ++++ src/common/meta/src/heartbeat/handler.rs | 98 +++++++++++++++++++ .../handler/parse_mailbox_message.rs | 0 .../meta}/src/heartbeat/handler/tests.rs | 2 +- .../meta}/src/heartbeat/mailbox.rs | 2 +- .../meta}/src/heartbeat/utils.rs | 2 +- src/common/meta/src/lib.rs | 1 + src/datanode/src/error.rs | 73 ++++++++------ src/datanode/src/heartbeat.rs | 21 ++-- src/datanode/src/heartbeat/handler.rs | 84 ---------------- .../src/heartbeat/handler/close_region.rs | 11 ++- .../src/heartbeat/handler/open_region.rs | 8 +- src/datanode/src/instance.rs | 4 +- src/datanode/src/tests.rs | 19 +++- 17 files changed, 236 insertions(+), 146 deletions(-) create mode 100644 src/common/meta/src/heartbeat.rs create mode 100644 src/common/meta/src/heartbeat/handler.rs rename src/{datanode => common/meta}/src/heartbeat/handler/parse_mailbox_message.rs (100%) rename src/{datanode => common/meta}/src/heartbeat/handler/tests.rs (94%) rename src/{datanode => common/meta}/src/heartbeat/mailbox.rs (96%) rename src/{datanode => common/meta}/src/heartbeat/utils.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index 550f189efa..1fcf4d763e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1757,11 +1757,16 @@ dependencies = [ "chrono", "common-catalog", "common-error", + "common-runtime", + "common-telemetry", + "common-time", "datatypes", "serde", "serde_json", "snafu", + "store-api", "table", + "tokio", ] [[package]] diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index efbcb19aa8..ea5fc18f28 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -8,10 +8,15 @@ license.workspace = true api = { path = "../../api" } common-catalog = { path = "../catalog" } common-error = { path = "../error" } +common-runtime = { path = "../runtime" } +common-telemetry = { path = "../telemetry" } +common-time = { path = "../time" } serde.workspace = true snafu.workspace = true serde_json.workspace = true +store-api = { path = "../../store-api" } table = { path = "../../table" } +tokio.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 187ff4950c..20c9edbb7c 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -13,11 +13,30 @@ // limitations under the License. use common_error::prelude::*; +use serde_json::error::Error as JsonError; use snafu::Location; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to encode object into json, source: {}", source))] + EncodeJson { + location: Location, + source: JsonError, + }, + + #[snafu(display("Failed to decode object from json, source: {}", source))] + DecodeJson { + location: Location, + source: JsonError, + }, + + #[snafu(display("Payload not exist"))] + PayloadNotExist { location: Location }, + + #[snafu(display("Failed to send message: {err_msg}"))] + SendMessage { err_msg: String, location: Location }, + #[snafu(display("Failed to serde json, source: {}", source))] SerdeJson { source: serde_json::error::Error, @@ -39,9 +58,16 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { + use Error::*; match self { - Error::IllegalServerState { .. } => StatusCode::Internal, - Error::SerdeJson { .. } | Error::RouteInfoCorrupted { .. } => StatusCode::Unexpected, + IllegalServerState { .. } => StatusCode::Internal, + SerdeJson { .. } | RouteInfoCorrupted { .. } => StatusCode::Unexpected, + + SendMessage { .. } => StatusCode::Internal, + + EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { + StatusCode::Unexpected + } } } diff --git a/src/common/meta/src/heartbeat.rs b/src/common/meta/src/heartbeat.rs new file mode 100644 index 0000000000..cae5c83145 --- /dev/null +++ b/src/common/meta/src/heartbeat.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod handler; +pub mod mailbox; +pub mod utils; diff --git a/src/common/meta/src/heartbeat/handler.rs b/src/common/meta/src/heartbeat/handler.rs new file mode 100644 index 0000000000..567a792134 --- /dev/null +++ b/src/common/meta/src/heartbeat/handler.rs @@ -0,0 +1,98 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::HeartbeatResponse; +use common_telemetry::error; + +use crate::error::Result; +use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; + +pub mod parse_mailbox_message; +#[cfg(test)] +mod tests; + +pub type HeartbeatResponseHandlerExecutorRef = Arc; +pub type HeartbeatResponseHandlerRef = Arc; + +pub struct HeartbeatResponseHandlerContext { + pub mailbox: MailboxRef, + pub response: HeartbeatResponse, + pub incoming_message: Option, +} + +/// HandleControl +/// +/// Controls process of handling heartbeat response. +#[derive(PartialEq)] +pub enum HandleControl { + Continue, + Done, +} + +impl HeartbeatResponseHandlerContext { + pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self { + Self { + mailbox, + response, + incoming_message: None, + } + } +} + +/// HeartbeatResponseHandler +/// +/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`]. +/// +/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`]. +pub trait HeartbeatResponseHandler: Send + Sync { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool; + + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result; +} + +pub trait HeartbeatResponseHandlerExecutor: Send + Sync { + fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>; +} + +pub struct HandlerGroupExecutor { + handlers: Vec, +} + +impl HandlerGroupExecutor { + pub fn new(handlers: Vec) -> Self { + Self { handlers } + } +} + +impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor { + fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> { + for handler in &self.handlers { + if !handler.is_acceptable(&ctx) { + continue; + } + + match handler.handle(&mut ctx) { + Ok(HandleControl::Done) => break, + Ok(HandleControl::Continue) => {} + Err(e) => { + error!(e;"Error while handling: {:?}", ctx.response); + break; + } + } + } + Ok(()) + } +} diff --git a/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs b/src/common/meta/src/heartbeat/handler/parse_mailbox_message.rs similarity index 100% rename from src/datanode/src/heartbeat/handler/parse_mailbox_message.rs rename to src/common/meta/src/heartbeat/handler/parse_mailbox_message.rs diff --git a/src/datanode/src/heartbeat/handler/tests.rs b/src/common/meta/src/heartbeat/handler/tests.rs similarity index 94% rename from src/datanode/src/heartbeat/handler/tests.rs rename to src/common/meta/src/heartbeat/handler/tests.rs index 40708f18b1..3313efe6f1 100644 --- a/src/datanode/src/heartbeat/handler/tests.rs +++ b/src/common/meta/src/heartbeat/handler/tests.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{InstructionReply, SimpleReply}; use tokio::sync::mpsc; use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; +use crate::instruction::{InstructionReply, SimpleReply}; #[tokio::test] async fn test_heartbeat_mailbox() { diff --git a/src/datanode/src/heartbeat/mailbox.rs b/src/common/meta/src/heartbeat/mailbox.rs similarity index 96% rename from src/datanode/src/heartbeat/mailbox.rs rename to src/common/meta/src/heartbeat/mailbox.rs index 3a00ec3622..944a423c4c 100644 --- a/src/datanode/src/heartbeat/mailbox.rs +++ b/src/common/meta/src/heartbeat/mailbox.rs @@ -14,10 +14,10 @@ use std::sync::Arc; -use common_meta::instruction::{Instruction, InstructionReply}; use tokio::sync::mpsc::Sender; use crate::error::{self, Result}; +use crate::instruction::{Instruction, InstructionReply}; pub type IncomingMessage = (MessageMeta, Instruction); pub type OutgoingMessage = (MessageMeta, InstructionReply); diff --git a/src/datanode/src/heartbeat/utils.rs b/src/common/meta/src/heartbeat/utils.rs similarity index 97% rename from src/datanode/src/heartbeat/utils.rs rename to src/common/meta/src/heartbeat/utils.rs index 2deb09800f..afee357372 100644 --- a/src/datanode/src/heartbeat/utils.rs +++ b/src/common/meta/src/heartbeat/utils.rs @@ -14,12 +14,12 @@ use api::v1::meta::mailbox_message::Payload; use api::v1::meta::MailboxMessage; -use common_meta::instruction::Instruction; use common_time::util::current_time_millis; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage}; +use crate::instruction::Instruction; pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result { m.payload diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 35ce7a7d58..979ceeb0ef 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod error; +pub mod heartbeat; pub mod instruction; pub mod peer; pub mod rpc; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 39c4983d05..77d0817e38 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -25,14 +25,6 @@ use table::error::Error as TableError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Failed to check region in table: {}, source: {}", table_name, source))] - CheckRegion { - table_name: String, - #[snafu(backtrace)] - source: TableError, - region_number: RegionNumber, - }, - #[snafu(display("Failed to access catalog, source: {}", source))] AccessCatalog { #[snafu(backtrace)] @@ -46,6 +38,13 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display("Failed to register table: {}, source: {}", table_name, source))] + RegisterTable { + table_name: String, + location: Location, + source: catalog::error::Error, + }, + #[snafu(display("Failed to open table: {}, source: {}", table_name, source))] OpenTable { table_name: String, @@ -53,11 +52,11 @@ pub enum Error { source: TableError, }, - #[snafu(display("Failed to register table: {}, source: {}", table_name, source))] - RegisterTable { + #[snafu(display("Failed to get table: {}, source: {}", table_name, source))] + GetTable { table_name: String, - #[snafu(backtrace)] - source: catalog::error::Error, + location: Location, + source: TableError, }, #[snafu(display( @@ -73,8 +72,24 @@ pub enum Error { source: TableError, }, - #[snafu(display("Failed to send message: {err_msg}"))] - SendMessage { err_msg: String, location: Location }, + #[snafu(display( + "Failed to check region {} in table: {}, source: {}", + region_number, + table_name, + source + ))] + CheckRegion { + table_name: String, + location: Location, + source: TableError, + region_number: RegionNumber, + }, + + #[snafu(display("Failed to handle heartbeat response, source: {}", source))] + HandleHeartbeatResponse { + location: Location, + source: common_meta::error::Error, + }, #[snafu(display("Failed to execute sql, source: {}", source))] ExecuteSql { @@ -128,13 +143,6 @@ pub enum Error { source: TableError, }, - #[snafu(display("Failed to get table: {}, source: {}", table_name, source))] - GetTable { - table_name: String, - #[snafu(backtrace)] - source: TableError, - }, - #[snafu(display("Failed to drop table {}, source: {}", table_name, source))] DropTable { table_name: String, @@ -438,7 +446,7 @@ pub enum Error { source: JsonError, }, - #[snafu(display("Failed to decode object into json, source: {}", source))] + #[snafu(display("Failed to decode object from json, source: {}", source))] DecodeJson { location: Location, source: JsonError, @@ -472,19 +480,14 @@ impl ErrorExt for Error { | ExecuteStatement { source } | ExecuteLogicalPlan { source } => source.status_code(), - OpenTable { source, .. } => source.status_code(), - RegisterTable { source, .. } - | DeregisterTable { source, .. } - | AccessCatalog { source, .. } => source.status_code(), + HandleHeartbeatResponse { source, .. } => source.status_code(), DecodeLogicalPlan { source } => source.status_code(), NewCatalog { source } | RegisterSchema { source } => source.status_code(), FindTable { source, .. } => source.status_code(), - CreateTable { source, .. } | CheckRegion { source, .. } => source.status_code(), + CreateTable { source, .. } => source.status_code(), DropTable { source, .. } => source.status_code(), FlushTable { source, .. } => source.status_code(), - GetTable { source, .. } => source.status_code(), - CloseTable { source, .. } => source.status_code(), Insert { source, .. } => source.status_code(), Delete { source, .. } => source.status_code(), @@ -524,6 +527,15 @@ impl ErrorExt for Error { StatusCode::Unexpected } + AccessCatalog { source, .. } + | DeregisterTable { source, .. } + | RegisterTable { source, .. } => source.status_code(), + + CheckRegion { source, .. } + | OpenTable { source, .. } + | CloseTable { source, .. } + | GetTable { source, .. } => source.status_code(), + // TODO(yingwen): Further categorize http error. StartServer { .. } | ParseAddr { .. } @@ -534,8 +546,7 @@ impl ErrorExt for Error { | IncorrectInternalState { .. } | ShutdownServer { .. } | ShutdownInstance { .. } - | CloseTableEngine { .. } - | SendMessage { .. } => StatusCode::Internal, + | CloseTableEngine { .. } => StatusCode::Internal, InitBackend { .. } => StatusCode::StorageUnavailable, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 64c82a930c..cb607e4b24 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -18,23 +18,20 @@ use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeStat, Peer}; use catalog::{datanode_stat, CatalogManagerRef}; +use common_meta::heartbeat::handler::{ + HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, +}; +use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; +use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{error, info, trace, warn}; -use mailbox::{HeartbeatMailbox, MailboxRef}; use meta_client::client::{HeartbeatSender, MetaClient}; use snafu::ResultExt; use tokio::sync::mpsc; use tokio::time::Instant; -use self::handler::{HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef}; -use self::utils::outgoing_message_to_mailbox_message; -use crate::error::{MetaClientInitSnafu, Result}; +use crate::error::{self, MetaClientInitSnafu, Result}; -pub mod handler; -pub mod utils; - -// TODO(weny): remove allow dead_code -#[allow(dead_code)] -pub mod mailbox; +pub(crate) mod handler; pub struct HeartbeatTask { node_id: u64, @@ -114,7 +111,9 @@ impl HeartbeatTask { handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Result<()> { trace!("heartbeat response: {:?}", ctx.response); - handler_executor.handle(ctx) + handler_executor + .handle(ctx) + .context(error::HandleHeartbeatResponseSnafu) } /// Start heartbeat task, spawn background task. diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index b41b8fcaed..664f349bcc 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -12,89 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use api::v1::meta::HeartbeatResponse; -use common_telemetry::error; - -use crate::error::Result; -use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; - pub mod close_region; pub mod open_region; -pub mod parse_mailbox_message; -#[cfg(test)] -mod tests; - -pub type HeartbeatResponseHandlerExecutorRef = Arc; -pub type HeartbeatResponseHandlerRef = Arc; - -pub struct HeartbeatResponseHandlerContext { - pub mailbox: MailboxRef, - pub response: HeartbeatResponse, - pub incoming_message: Option, -} - -/// HandleControl -/// -/// Controls process of handling heartbeat response. -#[derive(PartialEq)] -pub enum HandleControl { - Continue, - Done, -} - -impl HeartbeatResponseHandlerContext { - pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self { - Self { - mailbox, - response, - incoming_message: None, - } - } -} - -/// HeartbeatResponseHandler -/// -/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`]. -/// -/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`]. -pub trait HeartbeatResponseHandler: Send + Sync { - fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool; - - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result; -} - -pub trait HeartbeatResponseHandlerExecutor: Send + Sync { - fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>; -} - -pub struct HandlerGroupExecutor { - handlers: Vec, -} - -impl HandlerGroupExecutor { - pub fn new(handlers: Vec) -> Self { - Self { handlers } - } -} - -impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor { - fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> { - for handler in &self.handlers { - if !handler.is_acceptable(&ctx) { - continue; - } - - match handler.handle(&mut ctx) { - Ok(HandleControl::Done) => break, - Ok(HandleControl::Continue) => {} - Err(e) => { - error!(e;"Error while handling: {:?}", ctx.response); - break; - } - } - } - Ok(()) - } -} diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index 810912d491..ce9061e2bd 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -16,9 +16,12 @@ use std::sync::Arc; use catalog::{CatalogManagerRef, DeregisterTableRequest}; use common_catalog::format_full_table_name; +use common_meta::error::Result as MetaResult; +use common_meta::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; -use common_telemetry::{error, warn}; -use log::info; +use common_telemetry::{error, info, warn}; use snafu::ResultExt; use store_api::storage::RegionNumber; use table::engine::manager::TableEngineManagerRef; @@ -26,8 +29,6 @@ use table::engine::{CloseTableResult, EngineContext, TableReference}; use table::requests::CloseTableRequest; use crate::error::{self, Result}; -use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; -use crate::heartbeat::HeartbeatResponseHandlerContext; #[derive(Clone)] pub struct CloseRegionHandler { @@ -43,7 +44,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler { ) } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else { unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 95f21823f0..e8369bf038 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -17,6 +17,10 @@ use std::sync::Arc; use catalog::error::Error as CatalogError; use catalog::{CatalogManagerRef, RegisterTableRequest}; use common_catalog::format_full_table_name; +use common_meta::error::Result as MetaResult; +use common_meta::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; use common_telemetry::{error, warn}; use snafu::ResultExt; @@ -26,8 +30,6 @@ use table::engine::EngineContext; use table::requests::OpenTableRequest; use crate::error::{self, Result}; -use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; -use crate::heartbeat::HeartbeatResponseHandlerContext; #[derive(Clone)] pub struct OpenRegionHandler { @@ -43,7 +45,7 @@ impl HeartbeatResponseHandler for OpenRegionHandler { ) } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else { unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'"); }; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5903359a27..85ff745b18 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -24,6 +24,8 @@ use common_base::paths::{CLUSTER_DIR, WAL_DIR}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; @@ -60,8 +62,6 @@ use crate::error::{ }; use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; -use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; -use crate::heartbeat::handler::HandlerGroupExecutor; use crate::heartbeat::HeartbeatTask; use crate::sql::{SqlHandler, SqlRequest}; use crate::store; diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index aad0e6e674..2588dfd46a 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -20,6 +20,10 @@ use api::v1::meta::HeartbeatResponse; use api::v1::query_request::Query; use api::v1::QueryRequest; use catalog::CatalogManagerRef; +use common_meta::heartbeat::handler::{ + HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, +}; +use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; use common_query::Output; use datatypes::prelude::ConcreteDataType; @@ -31,10 +35,6 @@ use tokio::sync::mpsc::{self, Receiver}; use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; -use crate::heartbeat::handler::{ - HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, -}; -use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use crate::instance::Instance; pub(crate) mod test_util; @@ -205,6 +205,15 @@ async fn parepare_handler_test(name: &str) -> HandlerTestGuard { } } +pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta { + MessageMeta { + id, + subject: subject.to_string(), + to: to.to_string(), + from: from.to_string(), + } +} + fn handle_instruction( executor: Arc, mailbox: Arc, @@ -213,7 +222,7 @@ fn handle_instruction( let response = HeartbeatResponse::default(); let mut ctx: HeartbeatResponseHandlerContext = HeartbeatResponseHandlerContext::new(mailbox, response); - ctx.incoming_message = Some((MessageMeta::new_test(1, "hi", "foo", "bar"), instruction)); + ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); executor.handle(ctx).unwrap(); }