From d7a906e0bd81ff2b786fd6e88bb703afe67a3e22 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Thu, 4 May 2023 15:42:43 +0800 Subject: [PATCH] feat: metasrv mailbox (#1481) * refactor: id first in pusher_key * feat: is_acceptable for multi roles * feat: mailbox * fix: channel for mailbox * feat: impl mailbox via heartbeat * chore: add unit test for mailbox * chore: by cr * chore: typo * chore: refactor the mailbox API * chore: br cr * chore: check timeout interval to 10ms * chore: add response header --- Cargo.lock | 2 +- src/api/Cargo.toml | 2 +- src/meta-srv/src/error.rs | 27 ++ src/meta-srv/src/handler.rs | 299 +++++++++++++++++- .../src/handler/check_leader_handler.rs | 6 +- .../src/handler/collect_stats_handler.rs | 12 +- src/meta-srv/src/handler/failure_handler.rs | 10 +- .../src/handler/keep_lease_handler.rs | 12 +- src/meta-srv/src/handler/mailbox_handler.rs | 47 +++ src/meta-srv/src/handler/on_leader_start.rs | 6 +- .../src/handler/persist_stats_handler.rs | 15 +- .../src/handler/response_header_handler.rs | 14 +- src/meta-srv/src/metasrv.rs | 10 + src/meta-srv/src/metasrv/builder.rs | 11 +- src/meta-srv/src/service.rs | 1 + src/meta-srv/src/service/heartbeat.rs | 15 +- src/meta-srv/src/service/mailbox.rs | 76 +++++ 17 files changed, 507 insertions(+), 58 deletions(-) create mode 100644 src/meta-srv/src/handler/mailbox_handler.rs create mode 100644 src/meta-srv/src/service/mailbox.rs diff --git a/Cargo.lock b/Cargo.lock index e3dd55c39e..ecf7dd9eed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3821,7 +3821,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a26c40c004f998180b8acd853b22f083773f36b9#a26c40c004f998180b8acd853b22f083773f36b9" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e8abf8241c908448dce595399e89c89a40d048bd#e8abf8241c908448dce595399e89c89a40d048bd" dependencies = [ "prost", "tonic 0.9.2", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 1f6163b99f..b4b0523959 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a26c40c004f998180b8acd853b22f083773f36b9" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e8abf8241c908448dce595399e89c89a40d048bd" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index ec580fd5a1..9bc7be2c32 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -291,6 +291,28 @@ pub enum Error { schema_name: String, location: Location, }, + + #[snafu(display("Pusher not found: {pusher_id}"))] + PusherNotFound { + pusher_id: String, + location: Location, + }, + + #[snafu(display("Failed to push message: {err_msg}"))] + PushMessage { err_msg: String, location: Location }, + + #[snafu(display("Mailbox already closed: {id}"))] + MailboxClosed { id: u64, location: Location }, + + #[snafu(display("Mailbox timeout: {id}"))] + MailboxTimeout { id: u64, location: Location }, + + #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))] + MailboxReceiver { + id: u64, + err_msg: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -332,6 +354,11 @@ impl ErrorExt for Error { | Error::SendShutdownSignal { .. } | Error::ParseAddr { .. } | Error::SchemaAlreadyExists { .. } + | Error::PusherNotFound { .. } + | Error::PushMessage { .. } + | Error::MailboxClosed { .. } + | Error::MailboxTimeout { .. } + | Error::MailboxReceiver { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index d878d9103f..685d059ec6 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -12,39 +12,49 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::meta::{ + HeartbeatRequest, HeartbeatResponse, MailboxMessage, RequestHeader, ResponseHeader, Role, + PROTOCOL_VERSION, +}; pub use check_leader_handler::CheckLeaderHandler; pub use collect_stats_handler::CollectStatsHandler; +use common_telemetry::{info, warn}; +use dashmap::DashMap; pub use failure_handler::RegionFailureHandler; pub use keep_lease_handler::KeepLeaseHandler; pub use on_leader_start::OnLeaderStartHandler; pub use persist_stats_handler::PersistStatsHandler; pub use response_header_handler::ResponseHeaderHandler; +use snafu::OptionExt; +use tokio::sync::mpsc::Sender; +use tokio::sync::{oneshot, Notify, RwLock}; + +use self::instruction::Instruction; +use self::node_stat::Stat; +use crate::error::{self, Result}; +use crate::metasrv::Context; +use crate::sequence::Sequence; +use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId}; mod check_leader_handler; mod collect_stats_handler; mod failure_handler; mod instruction; mod keep_lease_handler; +pub mod mailbox_handler; pub mod node_stat; mod on_leader_start; mod persist_stats_handler; mod response_header_handler; -use std::collections::BTreeMap; -use std::sync::Arc; - -use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, ResponseHeader}; -use common_telemetry::info; -use tokio::sync::mpsc::Sender; -use tokio::sync::RwLock; - -use self::instruction::Instruction; -use self::node_stat::Stat; -use crate::error::Result; -use crate::metasrv::Context; - #[async_trait::async_trait] pub trait HeartbeatHandler: Send + Sync { + fn is_acceptable(&self, role: Role) -> bool; + async fn handle( &self, req: &HeartbeatRequest, @@ -61,13 +71,46 @@ pub struct HeartbeatAccumulator { } impl HeartbeatAccumulator { - pub fn into_payload(self) -> Vec> { + pub fn into_mailbox_messages(self) -> Vec { // TODO(jiachun): to HeartbeatResponse payload vec![] } } -pub type Pusher = Sender>; +pub struct Pusher { + sender: Sender>, + res_header: ResponseHeader, +} + +impl Pusher { + pub fn new( + sender: Sender>, + req_header: &Option, + ) -> Self { + let res_header = ResponseHeader { + protocol_version: PROTOCOL_VERSION, + cluster_id: req_header.as_ref().map_or(0, |h| h.cluster_id), + ..Default::default() + }; + + Self { sender, res_header } + } + + #[inline] + pub async fn push(&self, res: HeartbeatResponse) -> Result<()> { + self.sender.send(Ok(res)).await.map_err(|e| { + error::PushMessageSnafu { + err_msg: e.to_string(), + } + .build() + }) + } + + #[inline] + pub fn header(&self) -> ResponseHeader { + self.res_header.clone() + } +} #[derive(Clone, Default)] pub struct HeartbeatHandlerGroup { @@ -95,6 +138,10 @@ impl HeartbeatHandlerGroup { pushers.remove(key) } + pub fn pushers(&self) -> Arc>> { + self.pushers.clone() + } + pub async fn handle( &self, req: HeartbeatRequest, @@ -103,13 +150,231 @@ impl HeartbeatHandlerGroup { let mut acc = HeartbeatAccumulator::default(); let handlers = self.handlers.read().await; for h in handlers.iter() { - h.handle(&req, &mut ctx, &mut acc).await?; + if ctx.is_skip_all() { + break; + } + + let role = req + .header + .as_ref() + .and_then(|h| Role::from_i32(h.role)) + .context(error::InvalidArgumentsSnafu { + err_msg: format!("invalid role: {:?}", req.header), + })?; + + if h.is_acceptable(role) { + h.handle(&req, &mut ctx, &mut acc).await?; + } } let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, - payload: acc.into_payload(), + mailbox_messages: acc.into_mailbox_messages(), }; Ok(res) } } + +pub struct HeartbeatMailbox { + pushers: Arc>>, + sequence: Sequence, + senders: DashMap>>, + timeouts: DashMap, + timeout_notify: Notify, +} + +impl HeartbeatMailbox { + pub fn create( + pushers: Arc>>, + sequence: Sequence, + ) -> MailboxRef { + let mailbox = Arc::new(Self::new(pushers, sequence)); + + let timeout_checker = mailbox.clone(); + common_runtime::spawn_bg(async move { + timeout_checker.check_timeout_bg(10).await; + }); + + mailbox + } + + fn new(pushers: Arc>>, sequence: Sequence) -> Self { + Self { + pushers, + sequence, + senders: DashMap::default(), + timeouts: DashMap::default(), + timeout_notify: Notify::new(), + } + } + + async fn check_timeout_bg(&self, interval_millis: u64) { + let mut interval = tokio::time::interval(Duration::from_millis(interval_millis)); + + loop { + interval.tick().await; + + if self.timeouts.is_empty() { + self.timeout_notify.notified().await; + } + + let now = Duration::from_millis(common_time::util::current_time_millis() as u64); + let timeout_ids = self + .timeouts + .iter() + .filter_map(|entry| { + let (id, deadline) = entry.pair(); + if deadline < &now { + Some(*id) + } else { + None + } + }) + .collect::>(); + + for id in timeout_ids { + let _ = self + .on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build())) + .await; + } + } + } +} + +#[async_trait::async_trait] +impl Mailbox for HeartbeatMailbox { + async fn send( + &self, + ch: &Channel, + mut msg: MailboxMessage, + timeout: Duration, + ) -> Result { + let message_id = self.sequence.next().await?; + + let pusher_id = match ch { + Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id), + Channel::Frontend(id) => format!("{}-{}", Role::Frontend as i32, id), + }; + let pushers = self.pushers.read().await; + let pusher = pushers + .get(&pusher_id) + .context(error::PusherNotFoundSnafu { pusher_id })?; + + let (tx, rx) = oneshot::channel(); + self.senders.insert(message_id, tx); + let deadline = + Duration::from_millis(common_time::util::current_time_millis() as u64) + timeout; + self.timeouts.insert(message_id, deadline); + self.timeout_notify.notify_one(); + + let header = pusher.header(); + msg.id = message_id; + let res = HeartbeatResponse { + header: Some(header), + mailbox_messages: vec![msg], + }; + + pusher.push(res).await?; + + Ok(MailboxReceiver::new(message_id, rx)) + } + + async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()> { + self.timeouts.remove(&id); + + if let Some((_, tx)) = self.senders.remove(&id) { + tx.send(maybe_msg) + .map_err(|_| error::MailboxClosedSnafu { id }.build())?; + } else if let Ok(finally_msg) = maybe_msg { + let MailboxMessage { + id, + subject, + from, + to, + timestamp_millis, + .. + } = finally_msg; + warn!("The response arrived too late, id={id}, subject={subject}, from={from}, to={to}, timestamp={timestamp_millis}"); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION}; + use tokio::sync::mpsc; + + use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; + use crate::sequence::Sequence; + use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_mailbox() { + let (mailbox, receiver) = push_msg_via_mailbox().await; + let id = receiver.message_id(); + + let resp_msg = MailboxMessage { + id, + subject: "resp-test".to_string(), + timestamp_millis: 456, + ..Default::default() + }; + + mailbox.on_recv(id, Ok(resp_msg)).await.unwrap(); + + let recv_msg = receiver.await.unwrap().unwrap(); + assert_eq!(recv_msg.id, id); + assert_eq!(recv_msg.timestamp_millis, 456); + assert_eq!(recv_msg.subject, "resp-test".to_string()); + } + + #[tokio::test] + async fn test_mailbox_timeout() { + let (_, receiver) = push_msg_via_mailbox().await; + let res = receiver.await.unwrap(); + assert!(res.is_err()); + } + + async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) { + let datanode_id = 12; + let (pusher_tx, mut pusher_rx) = mpsc::channel(16); + let res_header = RequestHeader { + protocol_version: PROTOCOL_VERSION, + ..Default::default() + }; + let pusher: Pusher = Pusher::new(pusher_tx, &Option::from(res_header)); + let handler_group = HeartbeatHandlerGroup::default(); + handler_group + .register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher) + .await; + + let kv_store = Arc::new(MemStore::new()); + let seq = Sequence::new("test_seq", 0, 10, kv_store); + let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq); + + let msg = MailboxMessage { + id: 0, + subject: "req-test".to_string(), + timestamp_millis: 123, + ..Default::default() + }; + let ch = Channel::Datanode(datanode_id); + + let receiver = mailbox + .send(&ch, msg, Duration::from_secs(1)) + .await + .unwrap(); + + let recv_obj = pusher_rx.recv().await.unwrap().unwrap(); + assert_eq!(recv_obj.mailbox_messages[0].timestamp_millis, 123); + assert_eq!(recv_obj.mailbox_messages[0].subject, "req-test".to_string()); + + (mailbox, receiver) + } +} diff --git a/src/meta-srv/src/handler/check_leader_handler.rs b/src/meta-srv/src/handler/check_leader_handler.rs index c30f41fb18..2d83b5e8f1 100644 --- a/src/meta-srv/src/handler/check_leader_handler.rs +++ b/src/meta-srv/src/handler/check_leader_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{Error, HeartbeatRequest}; +use api::v1::meta::{Error, HeartbeatRequest, Role}; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; @@ -23,6 +23,10 @@ pub struct CheckLeaderHandler; #[async_trait::async_trait] impl HeartbeatHandler for CheckLeaderHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, _req: &HeartbeatRequest, diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index a1274fa5b7..5fc38379e1 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, Role}; use common_telemetry::debug; use super::node_stat::Stat; @@ -24,16 +24,16 @@ pub struct CollectStatsHandler; #[async_trait::async_trait] impl HeartbeatHandler for CollectStatsHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, req: &HeartbeatRequest, - ctx: &mut Context, + _ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { - if ctx.is_skip_all() { - return Ok(()); - } - match Stat::try_from(req.clone()) { Ok(stat) => { let _ = acc.stat.insert(stat); diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 2593be500b..67b05ad62a 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -14,7 +14,7 @@ mod runner; -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; use crate::error::Result; @@ -58,6 +58,10 @@ impl RegionFailureHandler { #[async_trait] impl HeartbeatHandler for RegionFailureHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, _: &HeartbeatRequest, @@ -70,10 +74,6 @@ impl HeartbeatHandler for RegionFailureHandler { .await; } - if ctx.is_skip_all() { - return Ok(()); - } - let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; let heartbeat = DatanodeHeartbeat { diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index c709330f70..24fa22134b 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue}; +use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue, Role}; use common_telemetry::{trace, warn}; use common_time::util as time_util; use tokio::sync::mpsc::{self, Sender}; @@ -55,16 +55,16 @@ impl KeepLeaseHandler { #[async_trait::async_trait] impl HeartbeatHandler for KeepLeaseHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, req: &HeartbeatRequest, - ctx: &mut Context, + _ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result<()> { - if ctx.is_skip_all() { - return Ok(()); - } - let HeartbeatRequest { header, peer, .. } = req; if let Some(peer) = &peer { let key = LeaseKey { diff --git a/src/meta-srv/src/handler/mailbox_handler.rs b/src/meta-srv/src/handler/mailbox_handler.rs new file mode 100644 index 0000000000..6bcc0ff4c8 --- /dev/null +++ b/src/meta-srv/src/handler/mailbox_handler.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; + +use crate::error::Result; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +#[derive(Default)] +pub struct MailboxHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for MailboxHandler { + fn is_acceptable(&self, _role: Role) -> bool { + true + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + if req.mailbox_messages.is_empty() { + return Ok(()); + } + + let mailbox_messages = req.mailbox_messages.clone(); + for msg in mailbox_messages { + ctx.mailbox.on_recv(msg.id, Ok(msg)).await?; + } + + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/on_leader_start.rs b/src/meta-srv/src/handler/on_leader_start.rs index 163be19a35..c65ff412b6 100644 --- a/src/meta-srv/src/handler/on_leader_start.rs +++ b/src/meta-srv/src/handler/on_leader_start.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, Role}; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; @@ -23,6 +23,10 @@ pub struct OnLeaderStartHandler; #[async_trait::async_trait] impl HeartbeatHandler for OnLeaderStartHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, _req: &HeartbeatRequest, diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 385f8c8fe6..d7395e15a1 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{HeartbeatRequest, PutRequest}; +use api::v1::meta::{HeartbeatRequest, PutRequest, Role}; use dashmap::DashMap; use crate::error::Result; @@ -30,16 +30,16 @@ pub struct PersistStatsHandler { #[async_trait::async_trait] impl HeartbeatHandler for PersistStatsHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, _req: &HeartbeatRequest, ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { - if ctx.is_skip_all() { - return Ok(()); - } - let Some(stat) = acc.stat.take() else { return Ok(()) }; let key = stat.stat_key(); @@ -78,18 +78,23 @@ mod tests { use api::v1::meta::RangeRequest; use super::*; + use crate::handler::HeartbeatMailbox; use crate::keys::StatKey; + use crate::sequence::Sequence; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_handle_datanode_stats() { let in_memory = Arc::new(MemStore::new()); let kv_store = Arc::new(MemStore::new()); + let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); + let mailbox = HeartbeatMailbox::create(Arc::new(Default::default()), seq); let mut ctx = Context { datanode_lease_secs: 30, server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, + mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), catalog: None, diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index e040d1e87e..733c086129 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{HeartbeatRequest, ResponseHeader, PROTOCOL_VERSION}; +use api::v1::meta::{HeartbeatRequest, ResponseHeader, Role, PROTOCOL_VERSION}; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; @@ -23,6 +23,10 @@ pub struct ResponseHeaderHandler; #[async_trait::async_trait] impl HeartbeatHandler for ResponseHeaderHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + async fn handle( &self, req: &HeartbeatRequest, @@ -48,18 +52,22 @@ mod tests { use api::v1::meta::{HeartbeatResponse, RequestHeader}; use super::*; - use crate::handler::Context; + use crate::handler::{Context, HeartbeatMailbox}; + use crate::sequence::Sequence; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_handle_heartbeat_resp_header() { let in_memory = Arc::new(MemStore::new()); let kv_store = Arc::new(MemStore::new()); + let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); + let mailbox = HeartbeatMailbox::create(Arc::new(Default::default()), seq); let mut ctx = Context { datanode_lease_secs: 30, server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, + mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), catalog: None, @@ -82,7 +90,7 @@ mod tests { let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, - payload: acc.into_payload(), + mailbox_messages: acc.into_mailbox_messages(), }; assert_eq!(1, res.header.unwrap().cluster_id); } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 10b95bee03..4c9fe18d3f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -35,6 +35,7 @@ use crate::lock::DistLockRef; use crate::metadata_service::MetadataServiceRef; use crate::selector::{Selector, SelectorType}; use crate::sequence::SequenceRef; +use crate::service::mailbox::MailboxRef; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; pub const TABLE_ID_SEQ: &str = "table_id"; @@ -73,6 +74,7 @@ pub struct Context { pub server_addr: String, pub in_memory: ResettableKvStoreRef, pub kv_store: KvStoreRef, + pub mailbox: MailboxRef, pub election: Option, pub skip_all: Arc, pub catalog: Option, @@ -116,6 +118,7 @@ pub struct MetaSrv { lock: Option, procedure_manager: ProcedureManagerRef, metadata_service: MetadataServiceRef, + mailbox: MailboxRef, } impl MetaSrv { @@ -240,12 +243,18 @@ impl MetaSrv { self.lock.clone() } + #[inline] + pub fn mailbox(&self) -> MailboxRef { + self.mailbox.clone() + } + #[inline] pub fn new_ctx(&self) -> Context { let datanode_lease_secs = self.options().datanode_lease_secs; let server_addr = self.options().server_addr.clone(); let in_memory = self.in_memory(); let kv_store = self.kv_store(); + let mailbox = self.mailbox(); let election = self.election(); let skip_all = Arc::new(AtomicBool::new(false)); Context { @@ -253,6 +262,7 @@ impl MetaSrv { server_addr, in_memory, kv_store, + mailbox, election, skip_all, catalog: None, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 48c7c23233..90f8822115 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::MetaPeerClient; +use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::{ - CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, - OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, ResponseHeaderHandler, + CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, + KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, + ResponseHeaderHandler, }; use crate::lock::DistLockRef; use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; @@ -143,6 +145,7 @@ impl MetaSrvBuilder { group.add_handler(CheckLeaderHandler::default()).await; group.add_handler(OnLeaderStartHandler::default()).await; group.add_handler(CollectStatsHandler).await; + group.add_handler(MailboxHandler).await; group.add_handler(region_failure_handler).await; group.add_handler(PersistStatsHandler::default()).await; group @@ -158,6 +161,9 @@ impl MetaSrvBuilder { let metadata_service = metadata_service .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); + let mailbox_sequence = Sequence::new("heartbeat_mailbox", 0, 100, kv_store.clone()); + let mailbox = HeartbeatMailbox::create(handler_group.pushers(), mailbox_sequence); + MetaSrv { started, options, @@ -171,6 +177,7 @@ impl MetaSrvBuilder { lock, procedure_manager, metadata_service, + mailbox, } } } diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index cb386f82ff..70f95fbd25 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -21,6 +21,7 @@ pub mod admin; pub mod cluster; mod heartbeat; pub mod lock; +pub mod mailbox; pub mod router; pub mod store; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 83cf5e26ea..c7e7984975 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::io::ErrorKind; -use std::sync::atomic::{AtomicU64, Ordering}; use api::v1::meta::{ heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, @@ -27,11 +26,10 @@ use tonic::{Request, Response, Streaming}; use crate::error; use crate::error::Result; +use crate::handler::Pusher; use crate::metasrv::{Context, MetaSrv}; use crate::service::{GrpcResult, GrpcStream}; -static PUSHER_ID: AtomicU64 = AtomicU64::new(0); - #[async_trait::async_trait] impl heartbeat_server::Heartbeat for MetaSrv { type HeartbeatStream = GrpcStream; @@ -50,15 +48,12 @@ impl heartbeat_server::Heartbeat for MetaSrv { let mut quit = false; match msg { Ok(req) => { + let role = req.header.as_ref().map_or(0, |h| h.role); if pusher_key.is_none() { if let Some(peer) = &req.peer { - let key = format!( - "{}-{}-{}", - peer.addr, - peer.id, - PUSHER_ID.fetch_add(1, Ordering::Relaxed) - ); - handler_group.register(&key, tx.clone()).await; + let key = format!("{}-{}", role, peer.id,); + let pusher = Pusher::new(tx.clone(), &req.header); + handler_group.register(&key, pusher).await; pusher_key = Some(key); } } diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs new file mode 100644 index 0000000000..32535e5947 --- /dev/null +++ b/src/meta-srv/src/service/mailbox.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use api::v1::meta::MailboxMessage; +use futures::Future; +use tokio::sync::oneshot; + +use crate::error::{self, Result}; + +pub type MailboxRef = Arc; + +pub type MessageId = u64; + +pub enum Channel { + Datanode(u64), + Frontend(u64), +} + +pub struct MailboxReceiver { + message_id: MessageId, + rx: oneshot::Receiver>, +} + +impl MailboxReceiver { + pub fn new(message_id: MessageId, rx: oneshot::Receiver>) -> Self { + Self { message_id, rx } + } + + pub fn message_id(&self) -> MessageId { + self.message_id + } +} + +impl Future for MailboxReceiver { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.rx).poll(cx).map(|r| { + r.map_err(|e| { + error::MailboxReceiverSnafu { + id: self.message_id, + err_msg: e.to_string(), + } + .build() + }) + }) + } +} + +#[async_trait::async_trait] +pub trait Mailbox: Send + Sync { + async fn send( + &self, + ch: &Channel, + msg: MailboxMessage, + timeout: Duration, + ) -> Result; + + async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()>; +}