feat: impl pubsub in metasrv (#2045)

* feat: impl pubsub

* add test_subscriber_disconnect unit test

* chore: cr

* cr

* cr
This commit is contained in:
fys
2023-08-03 11:56:43 +08:00
committed by GitHub
parent fdd4929c8f
commit dda922507f
13 changed files with 591 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -5283,6 +5283,7 @@ dependencies = [
"serde",
"serde_json",
"servers",
"session",
"snafu",
"store-api",
"table",

View File

@@ -58,5 +58,6 @@ uuid.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure-test = { path = "../common/procedure-test" }
session = { path = "../session", features = ["testing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -21,6 +21,8 @@ use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use tonic::Code;
use crate::pubsub::Message;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
@@ -461,6 +463,12 @@ pub enum Error {
#[snafu(display("Invalid heartbeat request: {}", err_msg))]
InvalidHeartbeatRequest { err_msg: String, location: Location },
#[snafu(display("Failed to publish message: {:?}", source))]
PublishMessage {
source: SendError<Message>,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -506,6 +514,7 @@ impl ErrorExt for Error {
| Error::UpdateTableMetadata { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::ConvertGrpcExpr { .. }
| Error::PublishMessage { .. }
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }

View File

@@ -54,6 +54,7 @@ pub mod mailbox_handler;
pub mod node_stat;
mod on_leader_start_handler;
mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub(crate) mod region_lease_handler;
mod response_header_handler;

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::{Message, PublishRef};
pub struct PublishHeartbeatHandler {
publish: PublishRef,
}
impl PublishHeartbeatHandler {
pub fn new(publish: PublishRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publish }
}
}
#[async_trait]
impl HeartbeatHandler for PublishHeartbeatHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
_: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;
Ok(())
}
}

View File

@@ -32,6 +32,7 @@ mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod selector;
mod sequence;
pub mod service;

View File

@@ -38,6 +38,7 @@ use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
@@ -177,6 +178,7 @@ pub struct MetaSrv {
ddl_manager: DdlManagerRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}
impl MetaSrv {
@@ -196,6 +198,7 @@ impl MetaSrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let subscribe_manager = self.subscribe_manager().cloned();
let mut rx = election.subscribe_leader_change();
let task_handler = self.greptimedb_telemerty_task.clone();
let _handle = common_runtime::spawn_bg(async move {
@@ -219,6 +222,12 @@ impl MetaSrv {
});
}
LeaderChangeMessage::StepDown(leader) => {
if let Some(sub_manager) = subscribe_manager.clone() {
info!("Leader changed, un_subscribe all");
if let Err(e) = sub_manager.un_subscribe_all() {
error!("Failed to un_subscribe all, error: {}", e);
}
}
error!("Leader :{:?} step down", leader);
let _ = task_handler.stop().await.map_err(|e| {
debug!(
@@ -329,6 +338,14 @@ impl MetaSrv {
&self.table_metadata_manager
}
pub fn publish(&self) -> Option<&PublishRef> {
self.pubsub.as_ref().map(|suite| &suite.0)
}
pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> {
self.pubsub.as_ref().map(|suite| &suite.1)
}
#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().server_addr.clone();
@@ -339,6 +356,7 @@ impl MetaSrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));
Context {
server_addr,
in_memory,

View File

@@ -26,6 +26,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
@@ -40,6 +41,7 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::mailbox::MailboxRef;
@@ -60,6 +62,7 @@ pub struct MetaSrvBuilder {
lock: Option<DistLockRef>,
metadata_service: Option<MetadataServiceRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}
impl MetaSrvBuilder {
@@ -75,6 +78,7 @@ impl MetaSrvBuilder {
lock: None,
metadata_service: None,
datanode_clients: None,
pubsub: None,
}
}
@@ -128,6 +132,11 @@ impl MetaSrvBuilder {
self
}
pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self {
self.pubsub = Some((publish, subscribe_manager));
self
}
pub async fn build(self) -> Result<MetaSrv> {
let started = Arc::new(AtomicBool::new(false));
@@ -142,6 +151,7 @@ impl MetaSrvBuilder {
lock,
metadata_service,
datanode_clients,
pubsub,
} = self;
let options = options.unwrap_or_default();
@@ -215,6 +225,11 @@ impl MetaSrvBuilder {
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
if let Some((publish, _)) = pubsub.as_ref() {
group
.add_handler(PublishHeartbeatHandler::new(publish.clone()))
.await;
}
group
}
};
@@ -237,6 +252,7 @@ impl MetaSrvBuilder {
ddl_manager,
table_metadata_manager,
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
pubsub,
})
}
}

View File

@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::HeartbeatRequest;
mod publish;
mod subscribe_manager;
mod subscriber;
#[cfg(test)]
mod tests;
pub use publish::{DefaultPublish, Publish, PublishRef};
pub use subscribe_manager::{
AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery,
UnSubRequest,
};
pub use subscriber::{Subscriber, SubscriberRef, Transport};
/// Subscribed topic type, determined by the ability of meta.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Topic {
Heartbeat,
}
#[derive(Clone, Debug)]
pub enum Message {
Heartbeat(Box<HeartbeatRequest>),
}
impl Message {
pub fn topic(&self) -> Topic {
match self {
Message::Heartbeat(_) => Topic::Heartbeat,
}
}
}

View File

@@ -0,0 +1,72 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use common_telemetry::error;
use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest};
/// This trait provides a `send_msg` method that can be used by other modules
/// of meta to publish [Message].
#[async_trait::async_trait]
pub trait Publish: Send + Sync {
async fn send_msg(&self, message: Message);
}
pub type PublishRef = Arc<dyn Publish>;
/// The default implementation of [Publish]
pub struct DefaultPublish<M, T> {
subscribe_manager: Arc<M>,
_transport: PhantomData<T>,
}
impl<M, T> DefaultPublish<M, T> {
pub fn new(subscribe_manager: Arc<M>) -> Self {
Self {
subscribe_manager,
_transport: PhantomData,
}
}
}
#[async_trait::async_trait]
impl<M, T> Publish for DefaultPublish<M, T>
where
M: SubscribeManager<T>,
T: Transport + Debug,
{
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
.subscribers_by_topic(&message.topic());
for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
// If an error occurs, we consider the subscriber offline,
// so un_subscribe here.
let req = UnSubRequest {
subscriber_id: sub.id(),
};
if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) {
error!(e; "failed to un_subscribe, req: {:?}", req);
}
}
}
}
}

View File

@@ -0,0 +1,115 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::info;
use dashmap::DashMap;
use tokio::sync::mpsc::Sender;
use crate::error::Result;
use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport};
pub trait SubscribeQuery<T>: Send + Sync {
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>>;
}
pub trait SubscribeManager<T>: SubscribeQuery<T> {
fn subscribe(&self, req: AddSubRequest<T>) -> Result<()>;
fn un_subscribe(&self, req: UnSubRequest) -> Result<()>;
fn un_subscribe_all(&self) -> Result<()>;
}
pub type SubscribeManagerRef = Arc<dyn SubscribeManager<Sender<Message>>>;
pub struct AddSubRequest<T> {
pub topic_list: Vec<Topic>,
pub subscriber: Subscriber<T>,
}
#[derive(Debug, Clone)]
pub struct UnSubRequest {
pub subscriber_id: u32,
}
pub struct DefaultSubscribeManager<T> {
topic2sub: DashMap<Topic, Vec<Arc<Subscriber<T>>>>,
}
impl<T> Default for DefaultSubscribeManager<T> {
fn default() -> Self {
Self {
topic2sub: DashMap::new(),
}
}
}
impl<T> SubscribeQuery<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>> {
self.topic2sub
.get(topic)
.map(|list_ref| list_ref.clone())
.unwrap_or_else(Vec::new)
}
}
impl<T> SubscribeManager<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscribe(&self, req: AddSubRequest<T>) -> Result<()> {
let AddSubRequest {
topic_list,
subscriber,
} = req;
info!(
"Add a subscription, subscriber_id: {}, subscriber_name: {}, topic list: {:?}",
subscriber.id(),
subscriber.name(),
topic_list
);
let subscriber = Arc::new(subscriber);
for topic in topic_list {
let mut entry = self.topic2sub.entry(topic).or_insert_with(Vec::new);
entry.push(subscriber.clone());
}
Ok(())
}
fn un_subscribe(&self, req: UnSubRequest) -> Result<()> {
let UnSubRequest { subscriber_id } = req;
info!("Add a un_subscription, subscriber_id: {}", subscriber_id);
for mut sub_list in self.topic2sub.iter_mut() {
sub_list.retain(|subscriber| subscriber.id() != subscriber_id)
}
Ok(())
}
fn un_subscribe_all(&self) -> Result<()> {
self.topic2sub.clear();
Ok(())
}
}

View File

@@ -0,0 +1,75 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use snafu::ResultExt;
use tokio::sync::mpsc::Sender;
use crate::error::{self, Result};
use crate::pubsub::Message;
#[derive(Debug)]
pub struct Subscriber<T> {
/// Subscriber's id, globally unique, assigned by leader meta.
id: u32,
/// Subscriber's name, passed in by subscriber.
name: String,
/// Transport channel from meta to subscriber.
transporter: T,
}
pub type SubscriberRef<T> = Arc<Subscriber<T>>;
impl<T> Subscriber<T> {
pub fn new(id: u32, name: impl Into<String>, transporter: T) -> Self {
let name = name.into();
Self {
id,
name,
transporter,
}
}
pub fn id(&self) -> u32 {
self.id
}
pub fn name(&self) -> &str {
&self.name
}
}
impl<T> Subscriber<T>
where
T: Transport,
{
pub async fn transport_msg(&self, message: Message) -> Result<()> {
self.transporter.transport_msg(message).await
}
}
/// This trait defines how messages are delivered from meta to the subscriber.
#[async_trait::async_trait]
pub trait Transport: Send + Sync {
async fn transport_msg(&self, msg: Message) -> Result<()>;
}
#[async_trait::async_trait]
impl Transport for Sender<Message> {
async fn transport_msg(&self, msg: Message) -> Result<()> {
self.send(msg).await.context(error::PublishMessageSnafu)
}
}

View File

@@ -0,0 +1,185 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::HeartbeatRequest;
use tokio::sync::mpsc::{Receiver, Sender};
use super::DefaultSubscribeManager;
use crate::pubsub::{
AddSubRequest, DefaultPublish, Message, Publish, SubscribeManager, SubscribeQuery, Subscriber,
Topic, UnSubRequest,
};
#[tokio::test]
async fn test_pubsub() {
let manager = Arc::new(DefaultSubscribeManager::default());
let (subscriber1, mut rx1) = mock_subscriber(1, "tidigong");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
subscriber: subscriber1,
};
manager.subscribe(req).unwrap();
let (subscriber2, mut rx2) = mock_subscriber(2, "gcrm");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
subscriber: subscriber2,
};
manager.subscribe(req).unwrap();
let manager_clone = manager.clone();
let message_number: usize = 5;
tokio::spawn(async move {
let publisher: DefaultPublish<DefaultSubscribeManager<Sender<Message>>, Sender<Message>> =
DefaultPublish::new(manager_clone);
for _ in 0..message_number {
publisher.send_msg(mock_message()).await;
}
});
for _ in 0..message_number {
let msg = rx1.recv().await.unwrap();
check_message(msg);
let msg = rx2.recv().await.unwrap();
check_message(msg);
}
manager
.un_subscribe(UnSubRequest { subscriber_id: 1 })
.unwrap();
let may_msg = rx1.recv().await;
assert!(may_msg.is_none());
manager.un_subscribe_all().unwrap();
let may_msg = rx2.recv().await;
assert!(may_msg.is_none());
}
#[tokio::test]
async fn test_subscriber_disconnect() {
let manager = Arc::new(DefaultSubscribeManager::default());
let (subscriber1, rx1) = mock_subscriber(1, "tidigong");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
subscriber: subscriber1,
};
manager.subscribe(req).unwrap();
let (subscriber2, rx2) = mock_subscriber(2, "gcrm");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
subscriber: subscriber2,
};
manager.subscribe(req).unwrap();
let manager_clone = manager.clone();
let message_number: usize = 5;
let join = tokio::spawn(async move {
let publisher: DefaultPublish<DefaultSubscribeManager<Sender<Message>>, Sender<Message>> =
DefaultPublish::new(manager_clone);
for _ in 0..message_number {
publisher.send_msg(mock_message()).await;
}
});
// Simulate subscriber disconnection.
std::mem::drop(rx1);
std::mem::drop(rx2);
join.await.unwrap();
let subscriber_list = manager.subscribers_by_topic(&Topic::Heartbeat);
assert!(subscriber_list.is_empty());
}
#[test]
fn test_message() {
let msg = Message::Heartbeat(Box::default());
assert_eq!(Topic::Heartbeat, msg.topic());
}
#[test]
fn test_sub_manager() {
let manager = DefaultSubscribeManager::default();
let subscriber = mock_subscriber(1, "tidigong").0;
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
subscriber,
};
manager.subscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(1, ret.len());
let subscriber = mock_subscriber(2, "gcrm").0;
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
subscriber,
};
manager.subscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(2, ret.len());
let req = UnSubRequest { subscriber_id: 1 };
manager.un_subscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(1, ret.len());
let req = UnSubRequest { subscriber_id: 2 };
manager.un_subscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(0, ret.len());
}
#[tokio::test]
async fn test_subscriber() {
let (subscriber, mut rx) = mock_subscriber(1, "tudigong");
assert_eq!(1, subscriber.id());
assert_eq!("tudigong", subscriber.name());
subscriber.transport_msg(mock_message()).await.unwrap();
let may_msg = rx.recv().await;
assert!(may_msg.is_some());
match may_msg.unwrap() {
Message::Heartbeat(hb) => {
assert_eq!(123, hb.duration_since_epoch);
}
}
}
fn mock_subscriber(id: u32, name: &str) -> (Subscriber<Sender<Message>>, Receiver<Message>) {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let sub = Subscriber::new(id, name, tx);
(sub, rx)
}
fn mock_message() -> Message {
Message::Heartbeat(Box::new(HeartbeatRequest {
duration_since_epoch: 123,
..Default::default()
}))
}
fn check_message(message: Message) {
match message {
Message::Heartbeat(hb) => {
assert_eq!(123, hb.duration_since_epoch);
}
}
}