diff --git a/Cargo.lock b/Cargo.lock index a8f23d71c8..e93d75563b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1731,6 +1731,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-meta" +version = "0.2.0" +dependencies = [ + "common-error", + "serde", + "serde_json", + "snafu", +] + [[package]] name = "common-procedure" version = "0.2.0" @@ -2463,6 +2473,7 @@ dependencies = [ "common-function", "common-grpc", "common-grpc-expr", + "common-meta", "common-procedure", "common-query", "common-recordbatch", @@ -2479,6 +2490,7 @@ dependencies = [ "futures-util", "humantime-serde", "hyper", + "key-lock", "log", "log-store", "meta-client", @@ -4792,6 +4804,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-meta", "common-procedure", "common-runtime", "common-telemetry", diff --git a/Cargo.toml b/Cargo.toml index aa539fb2b2..3a46a61b6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "src/common/grpc", "src/common/grpc-expr", "src/common/mem-prof", + "src/common/meta", "src/common/procedure", "src/common/procedure-test", "src/common/query", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml new file mode 100644 index 0000000000..6056c748a5 --- /dev/null +++ b/src/common/meta/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "common-meta" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +common-error = { path = "../error" } +serde.workspace = true +snafu.workspace = true +serde_json.workspace = true diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs new file mode 100644 index 0000000000..2c24771436 --- /dev/null +++ b/src/common/meta/src/instruction.rs @@ -0,0 +1,83 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Instruction { + OpenRegion { + catalog: String, + schema: String, + table: String, + table_id: u32, + engine: String, + region_number: u32, + }, + CloseRegion { + catalog: String, + schema: String, + table: String, + table_id: u32, + engine: String, + region_number: u32, + }, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum InstructionReply { + OpenRegion { result: bool, error: Option }, + CloseRegion { result: bool, error: Option }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_serialize_instruction() { + let open_region = Instruction::OpenRegion { + catalog: "foo".to_string(), + schema: "bar".to_string(), + table: "hi".to_string(), + table_id: 1024, + engine: "mito".to_string(), + region_number: 1, + }; + + let serialized = serde_json::to_string(&open_region).unwrap(); + + assert_eq!( + r#"{"type":"open_region","catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito","region_number":1}"#, + serialized + ); + + let close_region = Instruction::CloseRegion { + catalog: "foo".to_string(), + schema: "bar".to_string(), + table: "hi".to_string(), + table_id: 1024, + engine: "mito".to_string(), + region_number: 1, + }; + + let serialized = serde_json::to_string(&close_region).unwrap(); + + assert_eq!( + r#"{"type":"close_region","catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito","region_number":1}"#, + serialized + ); + } +} diff --git a/src/meta-srv/src/handler/instruction.rs b/src/common/meta/src/lib.rs similarity index 93% rename from src/meta-srv/src/handler/instruction.rs rename to src/common/meta/src/lib.rs index 93620708bd..d817e21fd7 100644 --- a/src/meta-srv/src/handler/instruction.rs +++ b/src/common/meta/src/lib.rs @@ -12,5 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[derive(Debug)] -pub enum Instruction {} +pub mod instruction; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index d4608b83d8..aff160835a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -20,6 +20,7 @@ common-datasource = { path = "../common/datasource" } common-function = { path = "../common/function" } common-grpc = { path = "../common/grpc" } common-grpc-expr = { path = "../common/grpc-expr" } +common-meta = { path = "../common/meta" } common-procedure = { path = "../common/procedure" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } @@ -33,6 +34,7 @@ datatypes = { path = "../datatypes" } file-table-engine = { path = "../file-table-engine" } futures = "0.3" futures-util.workspace = true +key-lock = "0.1" hyper = { version = "0.14", features = ["full"] } humantime-serde = "1.1" log = "0.4" diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 79c6b2798b..e0c8e7fcae 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -25,6 +25,9 @@ use table::error::Error as TableError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to send message: {err_msg}"))] + SendMessage { err_msg: String, location: Location }, + #[snafu(display("Failed to execute sql, source: {}", source))] ExecuteSql { #[snafu(backtrace)] @@ -441,6 +444,15 @@ pub enum Error { location: Location, source: JsonError, }, + + #[snafu(display("Failed to decode object into json, source: {}", source))] + DecodeJson { + location: Location, + source: JsonError, + }, + + #[snafu(display("Payload not exist"))] + PayloadNotExist { location: Location }, } pub type Result = std::result::Result; @@ -501,7 +513,9 @@ impl ErrorExt for Error { | ColumnNoneDefaultValue { .. } | PrepareImmutableTable { .. } => StatusCode::InvalidArguments, - EncodeJson { .. } => StatusCode::Unexpected, + EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { + StatusCode::Unexpected + } // TODO(yingwen): Further categorize http error. StartServer { .. } @@ -517,7 +531,8 @@ impl ErrorExt for Error { | IncorrectInternalState { .. } | ShutdownServer { .. } | ShutdownInstance { .. } - | CloseTableEngine { .. } => StatusCode::Internal, + | CloseTableEngine { .. } + | SendMessage { .. } => StatusCode::Internal, InitBackend { .. } => StatusCode::StorageUnavailable, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index c49b9dbc4a..9ce24d066e 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,14 +16,26 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer}; +use api::v1::meta::{HeartbeatRequest, NodeStat, Peer}; use catalog::{datanode_stat, CatalogManagerRef}; use common_telemetry::{error, info, trace, warn}; +use mailbox::{HeartbeatMailbox, MailboxRef}; use meta_client::client::{HeartbeatSender, MetaClient}; use snafu::ResultExt; +use tokio::sync::mpsc; +use tokio::time::Instant; +use self::handler::{HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef}; +use self::utils::outgoing_message_to_mailbox_message; use crate::error::{MetaClientInitSnafu, Result}; +pub mod handler; +pub mod utils; + +// TODO(weny): remove allow dead_code +#[allow(dead_code)] +pub mod mailbox; + pub struct HeartbeatTask { node_id: u64, server_addr: String, @@ -32,6 +44,7 @@ pub struct HeartbeatTask { meta_client: Arc, catalog_manager: CatalogManagerRef, interval: u64, + heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef, } impl Drop for HeartbeatTask { @@ -48,6 +61,7 @@ impl HeartbeatTask { server_hostname: Option, meta_client: Arc, catalog_manager: CatalogManagerRef, + heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef, ) -> Self { Self { node_id, @@ -57,12 +71,15 @@ impl HeartbeatTask { meta_client, catalog_manager, interval: 5_000, // default interval is set to 5 secs + heartbeat_response_handler_exector, } } pub async fn create_streams( meta_client: &MetaClient, running: Arc, + handler_executor: HeartbeatResponseHandlerExecutorRef, + mailbox: MailboxRef, ) -> Result { let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?; common_runtime::spawn_bg(async move { @@ -73,7 +90,10 @@ impl HeartbeatTask { None } } { - Self::handle_response(res).await; + let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); + if let Err(e) = Self::handle_response(ctx, handler_executor.clone()) { + error!(e;"Error while handling heartbeat response"); + } if !running.load(Ordering::Acquire) { info!("Heartbeat task shutdown"); } @@ -83,8 +103,12 @@ impl HeartbeatTask { Ok(tx) } - async fn handle_response(resp: HeartbeatResponse) { - trace!("heartbeat response: {:?}", resp); + fn handle_response( + ctx: HeartbeatResponseHandlerContext, + handler_executor: HeartbeatResponseHandlerExecutorRef, + ) -> Result<()> { + trace!("heartbeat response: {:?}", ctx.response); + handler_executor.handle(ctx) } /// Start heartbeat task, spawn background task. @@ -101,39 +125,92 @@ impl HeartbeatTask { let node_id = self.node_id; let addr = resolve_addr(&self.server_addr, &self.server_hostname); let meta_client = self.meta_client.clone(); - let catalog_manager_clone = self.catalog_manager.clone(); - let mut tx = Self::create_streams(&meta_client, running.clone()).await?; + let handler_executor = self.heartbeat_response_handler_exector.clone(); + + let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16); + let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); + + let mut tx = Self::create_streams( + &meta_client, + running.clone(), + handler_executor.clone(), + mailbox.clone(), + ) + .await?; + common_runtime::spawn_bg(async move { - while running.load(Ordering::Acquire) { - let (region_num, region_stats) = datanode_stat(&catalog_manager_clone).await; + let sleep = tokio::time::sleep(Duration::from_millis(0)); + tokio::pin!(sleep); - let req = HeartbeatRequest { - peer: Some(Peer { - id: node_id, - addr: addr.clone(), - }), - node_stat: Some(NodeStat { - region_num: region_num as _, - ..Default::default() - }), - region_stats, - ..Default::default() - }; - - if let Err(e) = tx.send(req).await { - error!("Failed to send heartbeat to metasrv, error: {:?}", e); - match Self::create_streams(&meta_client, running.clone()).await { - Ok(new_tx) => { - info!("Reconnected to metasrv"); - tx = new_tx; + loop { + if !running.load(Ordering::Acquire) { + info!("shutdown heartbeat task"); + break; + } + let req = tokio::select! { + message = outgoing_rx.recv() => { + if let Some(message) = message { + match outgoing_message_to_mailbox_message(message) { + Ok(message) => { + let req = HeartbeatRequest { + peer: Some(Peer { + id: node_id, + addr: addr.clone(), + }), + mailbox_message: Some(message), + ..Default::default() + }; + Some(req) + } + Err(e) => { + error!(e;"Failed to encode mailbox messages!"); + None + } + } + } else { + None } - Err(e) => { - error!(e;"Failed to reconnect to metasrv!"); + } + _ = &mut sleep => { + let (region_num, region_stats) = datanode_stat(&catalog_manager_clone).await; + let req = HeartbeatRequest { + peer: Some(Peer { + id: node_id, + addr: addr.clone(), + }), + node_stat: Some(NodeStat { + region_num: region_num as _, + ..Default::default() + }), + region_stats, + ..Default::default() + }; + sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval)); + Some(req) + } + }; + if let Some(req) = req { + if let Err(e) = tx.send(req).await { + error!("Failed to send heartbeat to metasrv, error: {:?}", e); + match Self::create_streams( + &meta_client, + running.clone(), + handler_executor.clone(), + mailbox.clone(), + ) + .await + { + Ok(new_tx) => { + info!("Reconnected to metasrv"); + tx = new_tx; + } + Err(e) => { + error!(e;"Failed to reconnect to metasrv!"); + } } } } - tokio::time::sleep(Duration::from_millis(interval)).await; } }); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs new file mode 100644 index 0000000000..9804316022 --- /dev/null +++ b/src/datanode/src/heartbeat/handler.rs @@ -0,0 +1,89 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::HeartbeatResponse; +use common_telemetry::error; + +use crate::error::Result; +use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; + +pub mod parse_mailbox_message; +#[cfg(test)] +mod tests; + +pub type HeartbeatResponseHandlerExecutorRef = Arc; +pub type HeartbeatResponseHandlerRef = Arc; + +pub struct HeartbeatResponseHandlerContext { + pub mailbox: MailboxRef, + pub response: HeartbeatResponse, + pub incoming_message: Option, + is_skip_all: bool, +} + +impl HeartbeatResponseHandlerContext { + pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self { + Self { + mailbox, + response, + incoming_message: None, + is_skip_all: false, + } + } + + pub fn is_skip_all(&self) -> bool { + self.is_skip_all + } +} + +pub trait HeartbeatResponseHandler: Send + Sync { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool; + + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()>; +} + +pub trait HeartbeatResponseHandlerExecutor: Send + Sync { + fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>; +} + +pub struct HandlerGroupExecutor { + handlers: Vec, +} + +impl HandlerGroupExecutor { + pub fn new(handlers: Vec) -> Self { + Self { handlers } + } +} + +impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor { + fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> { + for handler in &self.handlers { + if ctx.is_skip_all() { + break; + } + + if !handler.is_acceptable(&ctx) { + continue; + } + + if let Err(e) = handler.handle(&mut ctx) { + error!(e;"Error while handling: {:?}", ctx.response); + } + } + Ok(()) + } +} diff --git a/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs b/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs new file mode 100644 index 0000000000..fc0ee46a5a --- /dev/null +++ b/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Result; +use crate::heartbeat::handler::{HeartbeatResponseHandler, HeartbeatResponseHandlerContext}; +use crate::heartbeat::utils::mailbox_message_to_incoming_message; + +#[derive(Default)] +pub struct ParseMailboxMessageHandler; + +impl HeartbeatResponseHandler for ParseMailboxMessageHandler { + fn is_acceptable(&self, _ctx: &HeartbeatResponseHandlerContext) -> bool { + true + } + + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()> { + if let Some(message) = &ctx.response.mailbox_message { + if message.payload.is_some() { + // mailbox_message_to_incoming_message will raise an error if payload is none + ctx.incoming_message = Some(mailbox_message_to_incoming_message(message.clone())?) + } + } + + Ok(()) + } +} diff --git a/src/datanode/src/heartbeat/handler/tests.rs b/src/datanode/src/heartbeat/handler/tests.rs new file mode 100644 index 0000000000..f9e56e2f9f --- /dev/null +++ b/src/datanode/src/heartbeat/handler/tests.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::InstructionReply; +use tokio::sync::mpsc; + +use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; + +#[tokio::test] +async fn test_heartbeat_mailbox() { + let (tx, mut rx) = mpsc::channel(8); + + let mailbox = HeartbeatMailbox::new(tx); + + let meta = MessageMeta::new_test(1, "test", "foo", "bar"); + let reply = InstructionReply::OpenRegion { + result: true, + error: None, + }; + mailbox.send((meta.clone(), reply.clone())).await.unwrap(); + + let message = rx.recv().await.unwrap(); + assert_eq!(message.0, meta); + assert_eq!(message.1, reply); +} diff --git a/src/datanode/src/heartbeat/mailbox.rs b/src/datanode/src/heartbeat/mailbox.rs new file mode 100644 index 0000000000..3a00ec3622 --- /dev/null +++ b/src/datanode/src/heartbeat/mailbox.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::instruction::{Instruction, InstructionReply}; +use tokio::sync::mpsc::Sender; + +use crate::error::{self, Result}; + +pub type IncomingMessage = (MessageMeta, Instruction); +pub type OutgoingMessage = (MessageMeta, InstructionReply); + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct MessageMeta { + pub id: u64, + pub subject: String, + pub to: String, + pub from: String, +} + +#[cfg(test)] +impl MessageMeta { + pub fn new_test(id: u64, subject: &str, to: &str, from: &str) -> Self { + MessageMeta { + id, + subject: subject.to_string(), + to: to.to_string(), + from: from.to_string(), + } + } +} + +pub struct HeartbeatMailbox { + sender: Sender, +} + +impl HeartbeatMailbox { + pub fn new(sender: Sender) -> Self { + Self { sender } + } + + pub async fn send(&self, message: OutgoingMessage) -> Result<()> { + self.sender.send(message).await.map_err(|e| { + error::SendMessageSnafu { + err_msg: e.to_string(), + } + .build() + }) + } +} + +pub type MailboxRef = Arc; diff --git a/src/datanode/src/heartbeat/utils.rs b/src/datanode/src/heartbeat/utils.rs new file mode 100644 index 0000000000..2deb09800f --- /dev/null +++ b/src/datanode/src/heartbeat/utils.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::mailbox_message::Payload; +use api::v1::meta::MailboxMessage; +use common_meta::instruction::Instruction; +use common_time::util::current_time_millis; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{self, Result}; +use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage}; + +pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result { + m.payload + .map(|payload| match payload { + Payload::Json(json) => { + let instruction: Instruction = serde_json::from_str(&json)?; + Ok(( + MessageMeta { + id: m.id, + subject: m.subject, + to: m.to, + from: m.from, + }, + instruction, + )) + } + }) + .transpose() + .context(error::DecodeJsonSnafu)? + .context(error::PayloadNotExistSnafu) +} + +pub fn outgoing_message_to_mailbox_message( + (meta, reply): OutgoingMessage, +) -> Result { + Ok(MailboxMessage { + id: meta.id, + subject: meta.subject, + from: meta.to, + to: meta.from, + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?, + )), + }) +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 6e81e5db52..e49de2afa8 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -62,6 +62,8 @@ use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, }; +use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use crate::heartbeat::handler::HandlerGroupExecutor; use crate::heartbeat::HeartbeatTask; use crate::sql::{SqlHandler, SqlRequest}; @@ -196,6 +198,9 @@ impl Instance { let factory = QueryEngineFactory::new(catalog_manager.clone()); let query_engine = factory.query_engine(); + let handlder_executor = + HandlerGroupExecutor::new(vec![Arc::new(ParseMailboxMessageHandler::default())]); + let heartbeat_task = match opts.mode { Mode::Standalone => None, Mode::Distributed => Some(HeartbeatTask::new( @@ -204,6 +209,7 @@ impl Instance { opts.rpc_hostname.clone(), meta_client.as_ref().unwrap().clone(), catalog_manager.clone(), + Arc::new(handlder_executor), )), }; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 6036bf19e7..3e3491da52 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -17,6 +17,7 @@ common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } +common-meta = { path = "../common/meta" } common-procedure = { path = "../common/procedure" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index a0b0c38e52..64b829e2c6 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -22,6 +22,7 @@ use api::v1::meta::{ }; pub use check_leader_handler::CheckLeaderHandler; pub use collect_stats_handler::CollectStatsHandler; +use common_meta::instruction::Instruction; use common_telemetry::{info, warn}; use dashmap::DashMap; pub use failure_handler::RegionFailureHandler; @@ -34,18 +35,15 @@ use snafu::OptionExt; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Notify, RwLock}; -use self::instruction::Instruction; use self::node_stat::Stat; use crate::error::{self, Result}; use crate::metasrv::Context; use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM; use crate::sequence::Sequence; use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId}; - mod check_leader_handler; mod collect_stats_handler; mod failure_handler; -mod instruction; mod keep_lease_handler; pub mod mailbox_handler; pub mod node_stat;