feat: metasrv mailbox (#1481)

* refactor: id first in pusher_key

* feat: is_acceptable for multi roles

* feat: mailbox

* fix: channel for mailbox

* feat: impl mailbox via heartbeat

* chore: add unit test for mailbox

* chore: by cr

* chore: typo

* chore: refactor the mailbox API

* chore: br cr

* chore: check timeout interval to 10ms

* chore: add response header
This commit is contained in:
JeremyHi
2023-05-04 15:42:43 +08:00
committed by GitHub
parent 6e1bb9e458
commit d7a906e0bd
17 changed files with 507 additions and 58 deletions

2
Cargo.lock generated
View File

@@ -3821,7 +3821,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a26c40c004f998180b8acd853b22f083773f36b9#a26c40c004f998180b8acd853b22f083773f36b9"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e8abf8241c908448dce595399e89c89a40d048bd#e8abf8241c908448dce595399e89c89a40d048bd"
dependencies = [
"prost",
"tonic 0.9.2",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a26c40c004f998180b8acd853b22f083773f36b9" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e8abf8241c908448dce595399e89c89a40d048bd" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -291,6 +291,28 @@ pub enum Error {
schema_name: String,
location: Location,
},
#[snafu(display("Pusher not found: {pusher_id}"))]
PusherNotFound {
pusher_id: String,
location: Location,
},
#[snafu(display("Failed to push message: {err_msg}"))]
PushMessage { err_msg: String, location: Location },
#[snafu(display("Mailbox already closed: {id}"))]
MailboxClosed { id: u64, location: Location },
#[snafu(display("Mailbox timeout: {id}"))]
MailboxTimeout { id: u64, location: Location },
#[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
MailboxReceiver {
id: u64,
err_msg: String,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -332,6 +354,11 @@ impl ErrorExt for Error {
| Error::SendShutdownSignal { .. }
| Error::ParseAddr { .. }
| Error::SchemaAlreadyExists { .. }
| Error::PusherNotFound { .. }
| Error::PushMessage { .. }
| Error::MailboxClosed { .. }
| Error::MailboxTimeout { .. }
| Error::MailboxReceiver { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }

View File

@@ -12,39 +12,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{
HeartbeatRequest, HeartbeatResponse, MailboxMessage, RequestHeader, ResponseHeader, Role,
PROTOCOL_VERSION,
};
pub use check_leader_handler::CheckLeaderHandler;
pub use collect_stats_handler::CollectStatsHandler;
use common_telemetry::{info, warn};
use dashmap::DashMap;
pub use failure_handler::RegionFailureHandler;
pub use keep_lease_handler::KeepLeaseHandler;
pub use on_leader_start::OnLeaderStartHandler;
pub use persist_stats_handler::PersistStatsHandler;
pub use response_header_handler::ResponseHeaderHandler;
use snafu::OptionExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};
use self::instruction::Instruction;
use self::node_stat::Stat;
use crate::error::{self, Result};
use crate::metasrv::Context;
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId};
mod check_leader_handler;
mod collect_stats_handler;
mod failure_handler;
mod instruction;
mod keep_lease_handler;
pub mod mailbox_handler;
pub mod node_stat;
mod on_leader_start;
mod persist_stats_handler;
mod response_header_handler;
use std::collections::BTreeMap;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, ResponseHeader};
use common_telemetry::info;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use self::instruction::Instruction;
use self::node_stat::Stat;
use crate::error::Result;
use crate::metasrv::Context;
#[async_trait::async_trait]
pub trait HeartbeatHandler: Send + Sync {
fn is_acceptable(&self, role: Role) -> bool;
async fn handle(
&self,
req: &HeartbeatRequest,
@@ -61,13 +71,46 @@ pub struct HeartbeatAccumulator {
}
impl HeartbeatAccumulator {
pub fn into_payload(self) -> Vec<Vec<u8>> {
pub fn into_mailbox_messages(self) -> Vec<MailboxMessage> {
// TODO(jiachun): to HeartbeatResponse payload
vec![]
}
}
pub type Pusher = Sender<std::result::Result<HeartbeatResponse, tonic::Status>>;
pub struct Pusher {
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
res_header: ResponseHeader,
}
impl Pusher {
pub fn new(
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
req_header: &Option<RequestHeader>,
) -> Self {
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: req_header.as_ref().map_or(0, |h| h.cluster_id),
..Default::default()
};
Self { sender, res_header }
}
#[inline]
pub async fn push(&self, res: HeartbeatResponse) -> Result<()> {
self.sender.send(Ok(res)).await.map_err(|e| {
error::PushMessageSnafu {
err_msg: e.to_string(),
}
.build()
})
}
#[inline]
pub fn header(&self) -> ResponseHeader {
self.res_header.clone()
}
}
#[derive(Clone, Default)]
pub struct HeartbeatHandlerGroup {
@@ -95,6 +138,10 @@ impl HeartbeatHandlerGroup {
pushers.remove(key)
}
pub fn pushers(&self) -> Arc<RwLock<BTreeMap<String, Pusher>>> {
self.pushers.clone()
}
pub async fn handle(
&self,
req: HeartbeatRequest,
@@ -103,13 +150,231 @@ impl HeartbeatHandlerGroup {
let mut acc = HeartbeatAccumulator::default();
let handlers = self.handlers.read().await;
for h in handlers.iter() {
h.handle(&req, &mut ctx, &mut acc).await?;
if ctx.is_skip_all() {
break;
}
let role = req
.header
.as_ref()
.and_then(|h| Role::from_i32(h.role))
.context(error::InvalidArgumentsSnafu {
err_msg: format!("invalid role: {:?}", req.header),
})?;
if h.is_acceptable(role) {
h.handle(&req, &mut ctx, &mut acc).await?;
}
}
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,
payload: acc.into_payload(),
mailbox_messages: acc.into_mailbox_messages(),
};
Ok(res)
}
}
pub struct HeartbeatMailbox {
pushers: Arc<RwLock<BTreeMap<String, Pusher>>>,
sequence: Sequence,
senders: DashMap<MessageId, oneshot::Sender<Result<MailboxMessage>>>,
timeouts: DashMap<MessageId, Duration>,
timeout_notify: Notify,
}
impl HeartbeatMailbox {
pub fn create(
pushers: Arc<RwLock<BTreeMap<String, Pusher>>>,
sequence: Sequence,
) -> MailboxRef {
let mailbox = Arc::new(Self::new(pushers, sequence));
let timeout_checker = mailbox.clone();
common_runtime::spawn_bg(async move {
timeout_checker.check_timeout_bg(10).await;
});
mailbox
}
fn new(pushers: Arc<RwLock<BTreeMap<String, Pusher>>>, sequence: Sequence) -> Self {
Self {
pushers,
sequence,
senders: DashMap::default(),
timeouts: DashMap::default(),
timeout_notify: Notify::new(),
}
}
async fn check_timeout_bg(&self, interval_millis: u64) {
let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
loop {
interval.tick().await;
if self.timeouts.is_empty() {
self.timeout_notify.notified().await;
}
let now = Duration::from_millis(common_time::util::current_time_millis() as u64);
let timeout_ids = self
.timeouts
.iter()
.filter_map(|entry| {
let (id, deadline) = entry.pair();
if deadline < &now {
Some(*id)
} else {
None
}
})
.collect::<Vec<_>>();
for id in timeout_ids {
let _ = self
.on_recv(id, Err(error::MailboxTimeoutSnafu { id }.build()))
.await;
}
}
}
}
#[async_trait::async_trait]
impl Mailbox for HeartbeatMailbox {
async fn send(
&self,
ch: &Channel,
mut msg: MailboxMessage,
timeout: Duration,
) -> Result<MailboxReceiver> {
let message_id = self.sequence.next().await?;
let pusher_id = match ch {
Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id),
Channel::Frontend(id) => format!("{}-{}", Role::Frontend as i32, id),
};
let pushers = self.pushers.read().await;
let pusher = pushers
.get(&pusher_id)
.context(error::PusherNotFoundSnafu { pusher_id })?;
let (tx, rx) = oneshot::channel();
self.senders.insert(message_id, tx);
let deadline =
Duration::from_millis(common_time::util::current_time_millis() as u64) + timeout;
self.timeouts.insert(message_id, deadline);
self.timeout_notify.notify_one();
let header = pusher.header();
msg.id = message_id;
let res = HeartbeatResponse {
header: Some(header),
mailbox_messages: vec![msg],
};
pusher.push(res).await?;
Ok(MailboxReceiver::new(message_id, rx))
}
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
self.timeouts.remove(&id);
if let Some((_, tx)) = self.senders.remove(&id) {
tx.send(maybe_msg)
.map_err(|_| error::MailboxClosedSnafu { id }.build())?;
} else if let Ok(finally_msg) = maybe_msg {
let MailboxMessage {
id,
subject,
from,
to,
timestamp_millis,
..
} = finally_msg;
warn!("The response arrived too late, id={id}, subject={subject}, from={from}, to={to}, timestamp={timestamp_millis}");
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION};
use tokio::sync::mpsc;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_mailbox() {
let (mailbox, receiver) = push_msg_via_mailbox().await;
let id = receiver.message_id();
let resp_msg = MailboxMessage {
id,
subject: "resp-test".to_string(),
timestamp_millis: 456,
..Default::default()
};
mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
let recv_msg = receiver.await.unwrap().unwrap();
assert_eq!(recv_msg.id, id);
assert_eq!(recv_msg.timestamp_millis, 456);
assert_eq!(recv_msg.subject, "resp-test".to_string());
}
#[tokio::test]
async fn test_mailbox_timeout() {
let (_, receiver) = push_msg_via_mailbox().await;
let res = receiver.await.unwrap();
assert!(res.is_err());
}
async fn push_msg_via_mailbox() -> (MailboxRef, MailboxReceiver) {
let datanode_id = 12;
let (pusher_tx, mut pusher_rx) = mpsc::channel(16);
let res_header = RequestHeader {
protocol_version: PROTOCOL_VERSION,
..Default::default()
};
let pusher: Pusher = Pusher::new(pusher_tx, &Option::from(res_header));
let handler_group = HeartbeatHandlerGroup::default();
handler_group
.register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)
.await;
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 0, 10, kv_store);
let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq);
let msg = MailboxMessage {
id: 0,
subject: "req-test".to_string(),
timestamp_millis: 123,
..Default::default()
};
let ch = Channel::Datanode(datanode_id);
let receiver = mailbox
.send(&ch, msg, Duration::from_secs(1))
.await
.unwrap();
let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
assert_eq!(recv_obj.mailbox_messages[0].timestamp_millis, 123);
assert_eq!(recv_obj.mailbox_messages[0].subject, "req-test".to_string());
(mailbox, receiver)
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{Error, HeartbeatRequest};
use api::v1::meta::{Error, HeartbeatRequest, Role};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
@@ -23,6 +23,10 @@ pub struct CheckLeaderHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CheckLeaderHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, Role};
use common_telemetry::debug;
use super::node_stat::Stat;
@@ -24,16 +24,16 @@ pub struct CollectStatsHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CollectStatsHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() {
return Ok(());
}
match Stat::try_from(req.clone()) {
Ok(stat) => {
let _ = acc.stat.insert(stat);

View File

@@ -14,7 +14,7 @@
mod runner;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use crate::error::Result;
@@ -58,6 +58,10 @@ impl RegionFailureHandler {
#[async_trait]
impl HeartbeatHandler for RegionFailureHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_: &HeartbeatRequest,
@@ -70,10 +74,6 @@ impl HeartbeatHandler for RegionFailureHandler {
.await;
}
if ctx.is_skip_all() {
return Ok(());
}
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
let heartbeat = DatanodeHeartbeat {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue};
use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue, Role};
use common_telemetry::{trace, warn};
use common_time::util as time_util;
use tokio::sync::mpsc::{self, Sender};
@@ -55,16 +55,16 @@ impl KeepLeaseHandler {
#[async_trait::async_trait]
impl HeartbeatHandler for KeepLeaseHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() {
return Ok(());
}
let HeartbeatRequest { header, peer, .. } = req;
if let Some(peer) = &peer {
let key = LeaseKey {

View File

@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct MailboxHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for MailboxHandler {
fn is_acceptable(&self, _role: Role) -> bool {
true
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if req.mailbox_messages.is_empty() {
return Ok(());
}
let mailbox_messages = req.mailbox_messages.clone();
for msg in mailbox_messages {
ctx.mailbox.on_recv(msg.id, Ok(msg)).await?;
}
Ok(())
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, Role};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
@@ -23,6 +23,10 @@ pub struct OnLeaderStartHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for OnLeaderStartHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, PutRequest};
use api::v1::meta::{HeartbeatRequest, PutRequest, Role};
use dashmap::DashMap;
use crate::error::Result;
@@ -30,16 +30,16 @@ pub struct PersistStatsHandler {
#[async_trait::async_trait]
impl HeartbeatHandler for PersistStatsHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() {
return Ok(());
}
let Some(stat) = acc.stat.take() else { return Ok(()) };
let key = stat.stat_key();
@@ -78,18 +78,23 @@ mod tests {
use api::v1::meta::RangeRequest;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::keys::StatKey;
use crate::sequence::Sequence;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemStore::new());
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Arc::new(Default::default()), seq);
let mut ctx = Context {
datanode_lease_secs: 30,
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
catalog: None,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, ResponseHeader, PROTOCOL_VERSION};
use api::v1::meta::{HeartbeatRequest, ResponseHeader, Role, PROTOCOL_VERSION};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
@@ -23,6 +23,10 @@ pub struct ResponseHeaderHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for ResponseHeaderHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
@@ -48,18 +52,22 @@ mod tests {
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use super::*;
use crate::handler::Context;
use crate::handler::{Context, HeartbeatMailbox};
use crate::sequence::Sequence;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let in_memory = Arc::new(MemStore::new());
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Arc::new(Default::default()), seq);
let mut ctx = Context {
datanode_lease_secs: 30,
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
catalog: None,
@@ -82,7 +90,7 @@ mod tests {
let header = std::mem::take(&mut acc.header);
let res = HeartbeatResponse {
header,
payload: acc.into_payload(),
mailbox_messages: acc.into_mailbox_messages(),
};
assert_eq!(1, res.header.unwrap().cluster_id);
}

View File

@@ -35,6 +35,7 @@ use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub const TABLE_ID_SEQ: &str = "table_id";
@@ -73,6 +74,7 @@ pub struct Context {
pub server_addr: String,
pub in_memory: ResettableKvStoreRef,
pub kv_store: KvStoreRef,
pub mailbox: MailboxRef,
pub election: Option<ElectionRef>,
pub skip_all: Arc<AtomicBool>,
pub catalog: Option<String>,
@@ -116,6 +118,7 @@ pub struct MetaSrv {
lock: Option<DistLockRef>,
procedure_manager: ProcedureManagerRef,
metadata_service: MetadataServiceRef,
mailbox: MailboxRef,
}
impl MetaSrv {
@@ -240,12 +243,18 @@ impl MetaSrv {
self.lock.clone()
}
#[inline]
pub fn mailbox(&self) -> MailboxRef {
self.mailbox.clone()
}
#[inline]
pub fn new_ctx(&self) -> Context {
let datanode_lease_secs = self.options().datanode_lease_secs;
let server_addr = self.options().server_addr.clone();
let in_memory = self.in_memory();
let kv_store = self.kv_store();
let mailbox = self.mailbox();
let election = self.election();
let skip_all = Arc::new(AtomicBool::new(false));
Context {
@@ -253,6 +262,7 @@ impl MetaSrv {
server_addr,
in_memory,
kv_store,
mailbox,
election,
skip_all,
catalog: None,

View File

@@ -18,9 +18,11 @@ use std::sync::Arc;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::MetaPeerClient;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, ResponseHeaderHandler,
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler,
ResponseHeaderHandler,
};
use crate::lock::DistLockRef;
use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef};
@@ -143,6 +145,7 @@ impl MetaSrvBuilder {
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(region_failure_handler).await;
group.add_handler(PersistStatsHandler::default()).await;
group
@@ -158,6 +161,9 @@ impl MetaSrvBuilder {
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 0, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(handler_group.pushers(), mailbox_sequence);
MetaSrv {
started,
options,
@@ -171,6 +177,7 @@ impl MetaSrvBuilder {
lock,
procedure_manager,
metadata_service,
mailbox,
}
}
}

View File

@@ -21,6 +21,7 @@ pub mod admin;
pub mod cluster;
mod heartbeat;
pub mod lock;
pub mod mailbox;
pub mod router;
pub mod store;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::io::ErrorKind;
use std::sync::atomic::{AtomicU64, Ordering};
use api::v1::meta::{
heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse,
@@ -27,11 +26,10 @@ use tonic::{Request, Response, Streaming};
use crate::error;
use crate::error::Result;
use crate::handler::Pusher;
use crate::metasrv::{Context, MetaSrv};
use crate::service::{GrpcResult, GrpcStream};
static PUSHER_ID: AtomicU64 = AtomicU64::new(0);
#[async_trait::async_trait]
impl heartbeat_server::Heartbeat for MetaSrv {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
@@ -50,15 +48,12 @@ impl heartbeat_server::Heartbeat for MetaSrv {
let mut quit = false;
match msg {
Ok(req) => {
let role = req.header.as_ref().map_or(0, |h| h.role);
if pusher_key.is_none() {
if let Some(peer) = &req.peer {
let key = format!(
"{}-{}-{}",
peer.addr,
peer.id,
PUSHER_ID.fetch_add(1, Ordering::Relaxed)
);
handler_group.register(&key, tx.clone()).await;
let key = format!("{}-{}", role, peer.id,);
let pusher = Pusher::new(tx.clone(), &req.header);
handler_group.register(&key, pusher).await;
pusher_key = Some(key);
}
}

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use futures::Future;
use tokio::sync::oneshot;
use crate::error::{self, Result};
pub type MailboxRef = Arc<dyn Mailbox>;
pub type MessageId = u64;
pub enum Channel {
Datanode(u64),
Frontend(u64),
}
pub struct MailboxReceiver {
message_id: MessageId,
rx: oneshot::Receiver<Result<MailboxMessage>>,
}
impl MailboxReceiver {
pub fn new(message_id: MessageId, rx: oneshot::Receiver<Result<MailboxMessage>>) -> Self {
Self { message_id, rx }
}
pub fn message_id(&self) -> MessageId {
self.message_id
}
}
impl Future for MailboxReceiver {
type Output = Result<Result<MailboxMessage>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(cx).map(|r| {
r.map_err(|e| {
error::MailboxReceiverSnafu {
id: self.message_id,
err_msg: e.to_string(),
}
.build()
})
})
}
}
#[async_trait::async_trait]
pub trait Mailbox: Send + Sync {
async fn send(
&self,
ch: &Channel,
msg: MailboxMessage,
timeout: Duration,
) -> Result<MailboxReceiver>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
}