refactor(naming): use the better naming for pubsub (#3960)

This commit is contained in:
zyy17
2024-05-17 11:00:15 +08:00
committed by GitHub
parent 5c0a530ad1
commit ca409a732f
7 changed files with 94 additions and 96 deletions

View File

@@ -18,15 +18,15 @@ use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::{Message, PublishRef};
use crate::pubsub::{Message, PublisherRef};
pub struct PublishHeartbeatHandler {
publish: PublishRef,
publisher: PublisherRef,
}
impl PublishHeartbeatHandler {
pub fn new(publish: PublishRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publish }
pub fn new(publisher: PublisherRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publisher }
}
}
@@ -43,7 +43,7 @@ impl HeartbeatHandler for PublishHeartbeatHandler {
_: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;
self.publisher.publish(msg).await;
Ok(HandleControl::Continue)
}

View File

@@ -53,7 +53,7 @@ use crate::handler::HeartbeatHandlerGroup;
use crate::lease::lookup_alive_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
@@ -256,7 +256,7 @@ pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
subscribe_manager: Option<SubscribeManagerRef>,
subscribe_manager: Option<SubscriptionManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
state: StateRef,
@@ -295,7 +295,7 @@ impl MetaStateHandler {
if let Some(sub_manager) = self.subscribe_manager.clone() {
info!("Leader changed, un_subscribe all");
if let Err(e) = sub_manager.un_subscribe_all() {
if let Err(e) = sub_manager.unsubscribe_all() {
error!("Failed to un_subscribe all, error: {}", e);
}
}
@@ -351,7 +351,7 @@ impl Metasrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
let subscribe_manager = self.subscribe_manager();
let subscribe_manager = self.subscription_manager();
let mut rx = election.subscribe_leader_change();
let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
greptimedb_telemetry_task
@@ -540,12 +540,12 @@ impl Metasrv {
&self.region_migration_manager
}
pub fn publish(&self) -> Option<PublishRef> {
self.plugins.get::<PublishRef>()
pub fn publish(&self) -> Option<PublisherRef> {
self.plugins.get::<PublisherRef>()
}
pub fn subscribe_manager(&self) -> Option<SubscribeManagerRef> {
self.plugins.get::<SubscribeManagerRef>()
pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
self.plugins.get::<SubscriptionManagerRef>()
}
pub fn plugins(&self) -> &Plugins {

View File

@@ -66,7 +66,7 @@ use crate::metasrv::{
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublishRef;
use crate::pubsub::PublisherRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
@@ -320,7 +320,7 @@ impl MetasrvBuilder {
let publish_heartbeat_handler = plugins
.clone()
.and_then(|plugins| plugins.get::<PublishRef>())
.and_then(|plugins| plugins.get::<PublisherRef>())
.map(|publish| PublishHeartbeatHandler::new(publish.clone()));
let region_lease_handler = RegionLeaseHandler::new(

View File

@@ -20,10 +20,10 @@ mod subscriber;
#[cfg(test)]
mod tests;
pub use publish::{DefaultPublish, Publish, PublishRef};
pub use publish::{DefaultPublisher, Publisher, PublisherRef};
pub use subscribe_manager::{
AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery,
UnSubRequest,
DefaultSubscribeManager, SubscribeRequest, SubscriptionManager, SubscriptionManagerRef,
SubscriptionQuery, UnsubscribeRequest,
};
pub use subscriber::{Subscriber, SubscriberRef, Transport};

View File

@@ -18,53 +18,53 @@ use std::sync::Arc;
use common_telemetry::error;
use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest};
use crate::pubsub::{Message, SubscriptionManager, Transport, UnsubscribeRequest};
/// This trait provides a `send_msg` method that can be used by other modules
/// This trait provides a `publish` method that can be used by other modules
/// of meta to publish [Message].
#[async_trait::async_trait]
pub trait Publish: Send + Sync {
async fn send_msg(&self, message: Message);
pub trait Publisher: Send + Sync {
async fn publish(&self, message: Message);
}
pub type PublishRef = Arc<dyn Publish>;
pub type PublisherRef = Arc<dyn Publisher>;
/// The default implementation of [Publish]
pub struct DefaultPublish<M, T> {
subscribe_manager: Arc<M>,
/// The default implementation of [Publisher]
pub struct DefaultPublisher<M, T> {
subscription_manager: Arc<M>,
_transport: PhantomData<T>,
}
impl<M, T> DefaultPublish<M, T> {
pub fn new(subscribe_manager: Arc<M>) -> Self {
impl<M, T> DefaultPublisher<M, T> {
pub fn new(subscription_manager: Arc<M>) -> Self {
Self {
subscribe_manager,
subscription_manager,
_transport: PhantomData,
}
}
}
#[async_trait::async_trait]
impl<M, T> Publish for DefaultPublish<M, T>
impl<M, T> Publisher for DefaultPublisher<M, T>
where
M: SubscribeManager<T>,
M: SubscriptionManager<T>,
T: Transport + Debug,
{
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
async fn publish(&self, message: Message) {
let subscribers = self
.subscription_manager
.subscribers_by_topic(&message.topic());
for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
for subscriber in subscribers {
if subscriber.transport_msg(message.clone()).await.is_err() {
// If an error occurs, we consider the subscriber offline,
// so un_subscribe here.
let req = UnSubRequest {
subscriber_id: sub.id(),
let req = UnsubscribeRequest {
subscriber_id: subscriber.id(),
};
if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) {
error!(e; "failed to un_subscribe, req: {:?}", req);
if let Err(e) = self.subscription_manager.unsubscribe(req.clone()) {
error!(e; "failed to unsubscribe, req: {:?}", req);
}
}
}

View File

@@ -21,94 +21,92 @@ use tokio::sync::mpsc::Sender;
use crate::error::Result;
use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport};
pub trait SubscribeQuery<T>: Send + Sync {
pub trait SubscriptionQuery<T>: Send + Sync {
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>>;
}
pub trait SubscribeManager<T>: SubscribeQuery<T> {
fn subscribe(&self, req: AddSubRequest<T>) -> Result<()>;
pub trait SubscriptionManager<T>: SubscriptionQuery<T> {
fn subscribe(&self, req: SubscribeRequest<T>) -> Result<()>;
fn un_subscribe(&self, req: UnSubRequest) -> Result<()>;
fn unsubscribe(&self, req: UnsubscribeRequest) -> Result<()>;
fn un_subscribe_all(&self) -> Result<()>;
fn unsubscribe_all(&self) -> Result<()>;
}
pub type SubscribeManagerRef = Arc<dyn SubscribeManager<Sender<Message>>>;
pub type SubscriptionManagerRef = Arc<dyn SubscriptionManager<Sender<Message>>>;
pub struct AddSubRequest<T> {
pub topic_list: Vec<Topic>,
pub struct SubscribeRequest<T> {
pub topics: Vec<Topic>,
pub subscriber: Subscriber<T>,
}
#[derive(Debug, Clone)]
pub struct UnSubRequest {
pub struct UnsubscribeRequest {
pub subscriber_id: u32,
}
pub struct DefaultSubscribeManager<T> {
topic2sub: DashMap<Topic, Vec<Arc<Subscriber<T>>>>,
topic_to_subscribers: DashMap<Topic, Vec<Arc<Subscriber<T>>>>,
}
impl<T> Default for DefaultSubscribeManager<T> {
fn default() -> Self {
Self {
topic2sub: DashMap::new(),
topic_to_subscribers: DashMap::new(),
}
}
}
impl<T> SubscribeQuery<T> for DefaultSubscribeManager<T>
impl<T> SubscriptionQuery<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>> {
self.topic2sub
self.topic_to_subscribers
.get(topic)
.map(|list_ref| list_ref.clone())
.unwrap_or_default()
}
}
impl<T> SubscribeManager<T> for DefaultSubscribeManager<T>
impl<T> SubscriptionManager<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscribe(&self, req: AddSubRequest<T>) -> Result<()> {
let AddSubRequest {
topic_list,
subscriber,
} = req;
fn subscribe(&self, req: SubscribeRequest<T>) -> Result<()> {
let SubscribeRequest { topics, subscriber } = req;
info!(
"Add a subscription, subscriber_id: {}, subscriber_name: {}, topic list: {:?}",
"Add a subscriber, subscriber_id: {}, subscriber_name: {}, topics: {:?}",
subscriber.id(),
subscriber.name(),
topic_list
topics
);
let subscriber = Arc::new(subscriber);
for topic in topic_list {
let mut entry = self.topic2sub.entry(topic).or_default();
for topic in topics {
let mut entry = self.topic_to_subscribers.entry(topic).or_default();
entry.push(subscriber.clone());
}
Ok(())
}
fn un_subscribe(&self, req: UnSubRequest) -> Result<()> {
let UnSubRequest { subscriber_id } = req;
fn unsubscribe(&self, req: UnsubscribeRequest) -> Result<()> {
let UnsubscribeRequest { subscriber_id } = req;
info!("Add a un_subscription, subscriber_id: {}", subscriber_id);
info!("Remove a subscriber, subscriber_id: {}", subscriber_id);
for mut sub_list in self.topic2sub.iter_mut() {
sub_list.retain(|subscriber| subscriber.id() != subscriber_id)
for mut subscribers in self.topic_to_subscribers.iter_mut() {
subscribers.retain(|subscriber| subscriber.id() != subscriber_id)
}
Ok(())
}
fn un_subscribe_all(&self) -> Result<()> {
self.topic2sub.clear();
fn unsubscribe_all(&self) -> Result<()> {
self.topic_to_subscribers.clear();
Ok(())
}

View File

@@ -19,8 +19,8 @@ use tokio::sync::mpsc::{Receiver, Sender};
use super::DefaultSubscribeManager;
use crate::pubsub::{
AddSubRequest, DefaultPublish, Message, Publish, SubscribeManager, SubscribeQuery, Subscriber,
Topic, UnSubRequest,
DefaultPublisher, Message, Publisher, SubscribeRequest, Subscriber, SubscriptionManager,
SubscriptionQuery, Topic, UnsubscribeRequest,
};
#[tokio::test]
@@ -28,15 +28,15 @@ async fn test_pubsub() {
let manager = Arc::new(DefaultSubscribeManager::default());
let (subscriber1, mut rx1) = mock_subscriber(1, "tidigong");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
let req = SubscribeRequest {
topics: vec![Topic::Heartbeat],
subscriber: subscriber1,
};
manager.subscribe(req).unwrap();
let (subscriber2, mut rx2) = mock_subscriber(2, "gcrm");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
let req = SubscribeRequest {
topics: vec![Topic::Heartbeat],
subscriber: subscriber2,
};
manager.subscribe(req).unwrap();
@@ -44,10 +44,10 @@ async fn test_pubsub() {
let manager_clone = manager.clone();
let message_number: usize = 5;
tokio::spawn(async move {
let publisher: DefaultPublish<DefaultSubscribeManager<Sender<Message>>, Sender<Message>> =
DefaultPublish::new(manager_clone);
let publisher: DefaultPublisher<DefaultSubscribeManager<Sender<Message>>, Sender<Message>> =
DefaultPublisher::new(manager_clone);
for _ in 0..message_number {
publisher.send_msg(mock_message()).await;
publisher.publish(mock_message()).await;
}
});
@@ -59,12 +59,12 @@ async fn test_pubsub() {
}
manager
.un_subscribe(UnSubRequest { subscriber_id: 1 })
.unsubscribe(UnsubscribeRequest { subscriber_id: 1 })
.unwrap();
let may_msg = rx1.recv().await;
assert!(may_msg.is_none());
manager.un_subscribe_all().unwrap();
manager.unsubscribe_all().unwrap();
let may_msg = rx2.recv().await;
assert!(may_msg.is_none());
}
@@ -74,15 +74,15 @@ async fn test_subscriber_disconnect() {
let manager = Arc::new(DefaultSubscribeManager::default());
let (subscriber1, rx1) = mock_subscriber(1, "tidigong");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
let req = SubscribeRequest {
topics: vec![Topic::Heartbeat],
subscriber: subscriber1,
};
manager.subscribe(req).unwrap();
let (subscriber2, rx2) = mock_subscriber(2, "gcrm");
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
let req = SubscribeRequest {
topics: vec![Topic::Heartbeat],
subscriber: subscriber2,
};
manager.subscribe(req).unwrap();
@@ -90,10 +90,10 @@ async fn test_subscriber_disconnect() {
let manager_clone = manager.clone();
let message_number: usize = 5;
let join = tokio::spawn(async move {
let publisher: DefaultPublish<DefaultSubscribeManager<Sender<Message>>, Sender<Message>> =
DefaultPublish::new(manager_clone);
let publisher: DefaultPublisher<DefaultSubscribeManager<Sender<Message>>, Sender<Message>> =
DefaultPublisher::new(manager_clone);
for _ in 0..message_number {
publisher.send_msg(mock_message()).await;
publisher.publish(mock_message()).await;
}
});
@@ -118,8 +118,8 @@ fn test_sub_manager() {
let manager = DefaultSubscribeManager::default();
let subscriber = mock_subscriber(1, "tidigong").0;
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
let req = SubscribeRequest {
topics: vec![Topic::Heartbeat],
subscriber,
};
manager.subscribe(req).unwrap();
@@ -127,21 +127,21 @@ fn test_sub_manager() {
assert_eq!(1, ret.len());
let subscriber = mock_subscriber(2, "gcrm").0;
let req = AddSubRequest {
topic_list: vec![Topic::Heartbeat],
let req = SubscribeRequest {
topics: vec![Topic::Heartbeat],
subscriber,
};
manager.subscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(2, ret.len());
let req = UnSubRequest { subscriber_id: 1 };
manager.un_subscribe(req).unwrap();
let req = UnsubscribeRequest { subscriber_id: 1 };
manager.unsubscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(1, ret.len());
let req = UnSubRequest { subscriber_id: 2 };
manager.un_subscribe(req).unwrap();
let req = UnsubscribeRequest { subscriber_id: 2 };
manager.unsubscribe(req).unwrap();
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(0, ret.len());
}