feat(datanode): iImplement the heartbeat response handler (#1547)

* feat(datanode): implement instruction handler

* chore: apply suggestion from CR

* refactor: refactor heartbeat response handler
This commit is contained in:
Weny Xu
2023-05-11 10:27:13 +09:00
committed by GitHub
parent 7a9dd5f0c8
commit 44aef6fcbd
16 changed files with 527 additions and 37 deletions

13
Cargo.lock generated
View File

@@ -1731,6 +1731,16 @@ dependencies = [
"tokio",
]
[[package]]
name = "common-meta"
version = "0.2.0"
dependencies = [
"common-error",
"serde",
"serde_json",
"snafu",
]
[[package]]
name = "common-procedure"
version = "0.2.0"
@@ -2463,6 +2473,7 @@ dependencies = [
"common-function",
"common-grpc",
"common-grpc-expr",
"common-meta",
"common-procedure",
"common-query",
"common-recordbatch",
@@ -2479,6 +2490,7 @@ dependencies = [
"futures-util",
"humantime-serde",
"hyper",
"key-lock",
"log",
"log-store",
"meta-client",
@@ -4792,6 +4804,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-grpc",
"common-meta",
"common-procedure",
"common-runtime",
"common-telemetry",

View File

@@ -14,6 +14,7 @@ members = [
"src/common/grpc",
"src/common/grpc-expr",
"src/common/mem-prof",
"src/common/meta",
"src/common/procedure",
"src/common/procedure-test",
"src/common/query",

View File

@@ -0,0 +1,11 @@
[package]
name = "common-meta"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-error = { path = "../error" }
serde.workspace = true
snafu.workspace = true
serde_json.workspace = true

View File

@@ -0,0 +1,83 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Instruction {
OpenRegion {
catalog: String,
schema: String,
table: String,
table_id: u32,
engine: String,
region_number: u32,
},
CloseRegion {
catalog: String,
schema: String,
table: String,
table_id: u32,
engine: String,
region_number: u32,
},
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
OpenRegion { result: bool, error: Option<String> },
CloseRegion { result: bool, error: Option<String> },
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize_instruction() {
let open_region = Instruction::OpenRegion {
catalog: "foo".to_string(),
schema: "bar".to_string(),
table: "hi".to_string(),
table_id: 1024,
engine: "mito".to_string(),
region_number: 1,
};
let serialized = serde_json::to_string(&open_region).unwrap();
assert_eq!(
r#"{"type":"open_region","catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito","region_number":1}"#,
serialized
);
let close_region = Instruction::CloseRegion {
catalog: "foo".to_string(),
schema: "bar".to_string(),
table: "hi".to_string(),
table_id: 1024,
engine: "mito".to_string(),
region_number: 1,
};
let serialized = serde_json::to_string(&close_region).unwrap();
assert_eq!(
r#"{"type":"close_region","catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito","region_number":1}"#,
serialized
);
}
}

View File

@@ -12,5 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub enum Instruction {}
pub mod instruction;

View File

@@ -20,6 +20,7 @@ common-datasource = { path = "../common/datasource" }
common-function = { path = "../common/function" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-meta = { path = "../common/meta" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
@@ -33,6 +34,7 @@ datatypes = { path = "../datatypes" }
file-table-engine = { path = "../file-table-engine" }
futures = "0.3"
futures-util.workspace = true
key-lock = "0.1"
hyper = { version = "0.14", features = ["full"] }
humantime-serde = "1.1"
log = "0.4"

View File

@@ -25,6 +25,9 @@ use table::error::Error as TableError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage { err_msg: String, location: Location },
#[snafu(display("Failed to execute sql, source: {}", source))]
ExecuteSql {
#[snafu(backtrace)]
@@ -441,6 +444,15 @@ pub enum Error {
location: Location,
source: JsonError,
},
#[snafu(display("Failed to decode object into json, source: {}", source))]
DecodeJson {
location: Location,
source: JsonError,
},
#[snafu(display("Payload not exist"))]
PayloadNotExist { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -501,7 +513,9 @@ impl ErrorExt for Error {
| ColumnNoneDefaultValue { .. }
| PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } => StatusCode::Unexpected,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {
StatusCode::Unexpected
}
// TODO(yingwen): Further categorize http error.
StartServer { .. }
@@ -517,7 +531,8 @@ impl ErrorExt for Error {
| IncorrectInternalState { .. }
| ShutdownServer { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. } => StatusCode::Internal,
| CloseTableEngine { .. }
| SendMessage { .. } => StatusCode::Internal,
InitBackend { .. } => StatusCode::StorageUnavailable,

View File

@@ -16,14 +16,26 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer};
use api::v1::meta::{HeartbeatRequest, NodeStat, Peer};
use catalog::{datanode_stat, CatalogManagerRef};
use common_telemetry::{error, info, trace, warn};
use mailbox::{HeartbeatMailbox, MailboxRef};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::Instant;
use self::handler::{HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef};
use self::utils::outgoing_message_to_mailbox_message;
use crate::error::{MetaClientInitSnafu, Result};
pub mod handler;
pub mod utils;
// TODO(weny): remove allow dead_code
#[allow(dead_code)]
pub mod mailbox;
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
@@ -32,6 +44,7 @@ pub struct HeartbeatTask {
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
interval: u64,
heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef,
}
impl Drop for HeartbeatTask {
@@ -48,6 +61,7 @@ impl HeartbeatTask {
server_hostname: Option<String>,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef,
) -> Self {
Self {
node_id,
@@ -57,12 +71,15 @@ impl HeartbeatTask {
meta_client,
catalog_manager,
interval: 5_000, // default interval is set to 5 secs
heartbeat_response_handler_exector,
}
}
pub async fn create_streams(
meta_client: &MetaClient,
running: Arc<AtomicBool>,
handler_executor: HeartbeatResponseHandlerExecutorRef,
mailbox: MailboxRef,
) -> Result<HeartbeatSender> {
let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
common_runtime::spawn_bg(async move {
@@ -73,7 +90,10 @@ impl HeartbeatTask {
None
}
} {
Self::handle_response(res).await;
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()) {
error!(e;"Error while handling heartbeat response");
}
if !running.load(Ordering::Acquire) {
info!("Heartbeat task shutdown");
}
@@ -83,8 +103,12 @@ impl HeartbeatTask {
Ok(tx)
}
async fn handle_response(resp: HeartbeatResponse) {
trace!("heartbeat response: {:?}", resp);
fn handle_response(
ctx: HeartbeatResponseHandlerContext,
handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Result<()> {
trace!("heartbeat response: {:?}", ctx.response);
handler_executor.handle(ctx)
}
/// Start heartbeat task, spawn background task.
@@ -101,39 +125,92 @@ impl HeartbeatTask {
let node_id = self.node_id;
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
let handler_executor = self.heartbeat_response_handler_exector.clone();
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
let mut tx = Self::create_streams(
&meta_client,
running.clone(),
handler_executor.clone(),
mailbox.clone(),
)
.await?;
common_runtime::spawn_bg(async move {
while running.load(Ordering::Acquire) {
let (region_num, region_stats) = datanode_stat(&catalog_manager_clone).await;
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: addr.clone(),
}),
node_stat: Some(NodeStat {
region_num: region_num as _,
..Default::default()
}),
region_stats,
..Default::default()
};
if let Err(e) = tx.send(req).await {
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
match Self::create_streams(&meta_client, running.clone()).await {
Ok(new_tx) => {
info!("Reconnected to metasrv");
tx = new_tx;
loop {
if !running.load(Ordering::Acquire) {
info!("shutdown heartbeat task");
break;
}
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: addr.clone(),
}),
mailbox_message: Some(message),
..Default::default()
};
Some(req)
}
Err(e) => {
error!(e;"Failed to encode mailbox messages!");
None
}
}
} else {
None
}
Err(e) => {
error!(e;"Failed to reconnect to metasrv!");
}
_ = &mut sleep => {
let (region_num, region_stats) = datanode_stat(&catalog_manager_clone).await;
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: addr.clone(),
}),
node_stat: Some(NodeStat {
region_num: region_num as _,
..Default::default()
}),
region_stats,
..Default::default()
};
sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval));
Some(req)
}
};
if let Some(req) = req {
if let Err(e) = tx.send(req).await {
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
match Self::create_streams(
&meta_client,
running.clone(),
handler_executor.clone(),
mailbox.clone(),
)
.await
{
Ok(new_tx) => {
info!("Reconnected to metasrv");
tx = new_tx;
}
Err(e) => {
error!(e;"Failed to reconnect to metasrv!");
}
}
}
}
tokio::time::sleep(Duration::from_millis(interval)).await;
}
});

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::HeartbeatResponse;
use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod parse_mailbox_message;
#[cfg(test)]
mod tests;
pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
pub struct HeartbeatResponseHandlerContext {
pub mailbox: MailboxRef,
pub response: HeartbeatResponse,
pub incoming_message: Option<IncomingMessage>,
is_skip_all: bool,
}
impl HeartbeatResponseHandlerContext {
pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
Self {
mailbox,
response,
incoming_message: None,
is_skip_all: false,
}
}
pub fn is_skip_all(&self) -> bool {
self.is_skip_all
}
}
pub trait HeartbeatResponseHandler: Send + Sync {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()>;
}
pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
}
pub struct HandlerGroupExecutor {
handlers: Vec<HeartbeatResponseHandlerRef>,
}
impl HandlerGroupExecutor {
pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
Self { handlers }
}
}
impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
for handler in &self.handlers {
if ctx.is_skip_all() {
break;
}
if !handler.is_acceptable(&ctx) {
continue;
}
if let Err(e) = handler.handle(&mut ctx) {
error!(e;"Error while handling: {:?}", ctx.response);
}
}
Ok(())
}
}

View File

@@ -0,0 +1,37 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::heartbeat::handler::{HeartbeatResponseHandler, HeartbeatResponseHandlerContext};
use crate::heartbeat::utils::mailbox_message_to_incoming_message;
#[derive(Default)]
pub struct ParseMailboxMessageHandler;
impl HeartbeatResponseHandler for ParseMailboxMessageHandler {
fn is_acceptable(&self, _ctx: &HeartbeatResponseHandlerContext) -> bool {
true
}
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()> {
if let Some(message) = &ctx.response.mailbox_message {
if message.payload.is_some() {
// mailbox_message_to_incoming_message will raise an error if payload is none
ctx.incoming_message = Some(mailbox_message_to_incoming_message(message.clone())?)
}
}
Ok(())
}
}

View File

@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::InstructionReply;
use tokio::sync::mpsc;
use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
#[tokio::test]
async fn test_heartbeat_mailbox() {
let (tx, mut rx) = mpsc::channel(8);
let mailbox = HeartbeatMailbox::new(tx);
let meta = MessageMeta::new_test(1, "test", "foo", "bar");
let reply = InstructionReply::OpenRegion {
result: true,
error: None,
};
mailbox.send((meta.clone(), reply.clone())).await.unwrap();
let message = rx.recv().await.unwrap();
assert_eq!(message.0, meta);
assert_eq!(message.1, reply);
}

View File

@@ -0,0 +1,64 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::instruction::{Instruction, InstructionReply};
use tokio::sync::mpsc::Sender;
use crate::error::{self, Result};
pub type IncomingMessage = (MessageMeta, Instruction);
pub type OutgoingMessage = (MessageMeta, InstructionReply);
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct MessageMeta {
pub id: u64,
pub subject: String,
pub to: String,
pub from: String,
}
#[cfg(test)]
impl MessageMeta {
pub fn new_test(id: u64, subject: &str, to: &str, from: &str) -> Self {
MessageMeta {
id,
subject: subject.to_string(),
to: to.to_string(),
from: from.to_string(),
}
}
}
pub struct HeartbeatMailbox {
sender: Sender<OutgoingMessage>,
}
impl HeartbeatMailbox {
pub fn new(sender: Sender<OutgoingMessage>) -> Self {
Self { sender }
}
pub async fn send(&self, message: OutgoingMessage) -> Result<()> {
self.sender.send(message).await.map_err(|e| {
error::SendMessageSnafu {
err_msg: e.to_string(),
}
.build()
})
}
}
pub type MailboxRef = Arc<HeartbeatMailbox>;

View File

@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::Instruction;
use common_time::util::current_time_millis;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::heartbeat::mailbox::{IncomingMessage, MessageMeta, OutgoingMessage};
pub fn mailbox_message_to_incoming_message(m: MailboxMessage) -> Result<IncomingMessage> {
m.payload
.map(|payload| match payload {
Payload::Json(json) => {
let instruction: Instruction = serde_json::from_str(&json)?;
Ok((
MessageMeta {
id: m.id,
subject: m.subject,
to: m.to,
from: m.from,
},
instruction,
))
}
})
.transpose()
.context(error::DecodeJsonSnafu)?
.context(error::PayloadNotExistSnafu)
}
pub fn outgoing_message_to_mailbox_message(
(meta, reply): OutgoingMessage,
) -> Result<MailboxMessage> {
Ok(MailboxMessage {
id: meta.id,
subject: meta.subject,
from: meta.to,
to: meta.from,
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&reply).context(error::EncodeJsonSnafu)?,
)),
})
}

View File

@@ -62,6 +62,8 @@ use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
};
use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use crate::heartbeat::handler::HandlerGroupExecutor;
use crate::heartbeat::HeartbeatTask;
use crate::sql::{SqlHandler, SqlRequest};
@@ -196,6 +198,9 @@ impl Instance {
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine();
let handlder_executor =
HandlerGroupExecutor::new(vec![Arc::new(ParseMailboxMessageHandler::default())]);
let heartbeat_task = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => Some(HeartbeatTask::new(
@@ -204,6 +209,7 @@ impl Instance {
opts.rpc_hostname.clone(),
meta_client.as_ref().unwrap().clone(),
catalog_manager.clone(),
Arc::new(handlder_executor),
)),
};

View File

@@ -17,6 +17,7 @@ common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-meta = { path = "../common/meta" }
common-procedure = { path = "../common/procedure" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }

View File

@@ -22,6 +22,7 @@ use api::v1::meta::{
};
pub use check_leader_handler::CheckLeaderHandler;
pub use collect_stats_handler::CollectStatsHandler;
use common_meta::instruction::Instruction;
use common_telemetry::{info, warn};
use dashmap::DashMap;
pub use failure_handler::RegionFailureHandler;
@@ -34,18 +35,15 @@ use snafu::OptionExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};
use self::instruction::Instruction;
use self::node_stat::Stat;
use crate::error::{self, Result};
use crate::metasrv::Context;
use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM;
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId};
mod check_leader_handler;
mod collect_stats_handler;
mod failure_handler;
mod instruction;
mod keep_lease_handler;
pub mod mailbox_handler;
pub mod node_stat;