From ca409a732f63012c8e3901ed78a8621cab897b24 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Fri, 17 May 2024 11:00:15 +0800 Subject: [PATCH] refactor(naming): use the better naming for pubsub (#3960) --- .../src/handler/publish_heartbeat_handler.rs | 10 ++-- src/meta-srv/src/metasrv.rs | 16 ++--- src/meta-srv/src/metasrv/builder.rs | 4 +- src/meta-srv/src/pubsub.rs | 6 +- src/meta-srv/src/pubsub/publish.rs | 44 +++++++------- src/meta-srv/src/pubsub/subscribe_manager.rs | 58 +++++++++---------- src/meta-srv/src/pubsub/tests.rs | 52 ++++++++--------- 7 files changed, 94 insertions(+), 96 deletions(-) diff --git a/src/meta-srv/src/handler/publish_heartbeat_handler.rs b/src/meta-srv/src/handler/publish_heartbeat_handler.rs index b5fb8572f5..67368fb218 100644 --- a/src/meta-srv/src/handler/publish_heartbeat_handler.rs +++ b/src/meta-srv/src/handler/publish_heartbeat_handler.rs @@ -18,15 +18,15 @@ use async_trait::async_trait; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -use crate::pubsub::{Message, PublishRef}; +use crate::pubsub::{Message, PublisherRef}; pub struct PublishHeartbeatHandler { - publish: PublishRef, + publisher: PublisherRef, } impl PublishHeartbeatHandler { - pub fn new(publish: PublishRef) -> PublishHeartbeatHandler { - PublishHeartbeatHandler { publish } + pub fn new(publisher: PublisherRef) -> PublishHeartbeatHandler { + PublishHeartbeatHandler { publisher } } } @@ -43,7 +43,7 @@ impl HeartbeatHandler for PublishHeartbeatHandler { _: &mut HeartbeatAccumulator, ) -> Result { let msg = Message::Heartbeat(Box::new(req.clone())); - self.publish.send_msg(msg).await; + self.publisher.publish(msg).await; Ok(HandleControl::Continue) } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index f058d49b4a..fb5b302e03 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -53,7 +53,7 @@ use crate::handler::HeartbeatHandlerGroup; use crate::lease::lookup_alive_datanode_peer; use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; -use crate::pubsub::{PublishRef, SubscribeManagerRef}; +use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; @@ -256,7 +256,7 @@ pub type ElectionRef = Arc>; pub struct MetaStateHandler { procedure_manager: ProcedureManagerRef, wal_options_allocator: WalOptionsAllocatorRef, - subscribe_manager: Option, + subscribe_manager: Option, greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, state: StateRef, @@ -295,7 +295,7 @@ impl MetaStateHandler { if let Some(sub_manager) = self.subscribe_manager.clone() { info!("Leader changed, un_subscribe all"); - if let Err(e) = sub_manager.un_subscribe_all() { + if let Err(e) = sub_manager.unsubscribe_all() { error!("Failed to un_subscribe all, error: {}", e); } } @@ -351,7 +351,7 @@ impl Metasrv { let procedure_manager = self.procedure_manager.clone(); let in_memory = self.in_memory.clone(); let leader_cached_kv_backend = self.leader_cached_kv_backend.clone(); - let subscribe_manager = self.subscribe_manager(); + let subscribe_manager = self.subscription_manager(); let mut rx = election.subscribe_leader_change(); let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone(); greptimedb_telemetry_task @@ -540,12 +540,12 @@ impl Metasrv { &self.region_migration_manager } - pub fn publish(&self) -> Option { - self.plugins.get::() + pub fn publish(&self) -> Option { + self.plugins.get::() } - pub fn subscribe_manager(&self) -> Option { - self.plugins.get::() + pub fn subscription_manager(&self) -> Option { + self.plugins.get::() } pub fn plugins(&self) -> &Plugins { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 5e082fa4ac..ddf8908773 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -66,7 +66,7 @@ use crate::metasrv::{ use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; -use crate::pubsub::PublishRef; +use crate::pubsub::PublisherRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; @@ -320,7 +320,7 @@ impl MetasrvBuilder { let publish_heartbeat_handler = plugins .clone() - .and_then(|plugins| plugins.get::()) + .and_then(|plugins| plugins.get::()) .map(|publish| PublishHeartbeatHandler::new(publish.clone())); let region_lease_handler = RegionLeaseHandler::new( diff --git a/src/meta-srv/src/pubsub.rs b/src/meta-srv/src/pubsub.rs index 0560861ebc..aceb94c901 100644 --- a/src/meta-srv/src/pubsub.rs +++ b/src/meta-srv/src/pubsub.rs @@ -20,10 +20,10 @@ mod subscriber; #[cfg(test)] mod tests; -pub use publish::{DefaultPublish, Publish, PublishRef}; +pub use publish::{DefaultPublisher, Publisher, PublisherRef}; pub use subscribe_manager::{ - AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery, - UnSubRequest, + DefaultSubscribeManager, SubscribeRequest, SubscriptionManager, SubscriptionManagerRef, + SubscriptionQuery, UnsubscribeRequest, }; pub use subscriber::{Subscriber, SubscriberRef, Transport}; diff --git a/src/meta-srv/src/pubsub/publish.rs b/src/meta-srv/src/pubsub/publish.rs index 8657b376c6..124ead92a5 100644 --- a/src/meta-srv/src/pubsub/publish.rs +++ b/src/meta-srv/src/pubsub/publish.rs @@ -18,53 +18,53 @@ use std::sync::Arc; use common_telemetry::error; -use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest}; +use crate::pubsub::{Message, SubscriptionManager, Transport, UnsubscribeRequest}; -/// This trait provides a `send_msg` method that can be used by other modules +/// This trait provides a `publish` method that can be used by other modules /// of meta to publish [Message]. #[async_trait::async_trait] -pub trait Publish: Send + Sync { - async fn send_msg(&self, message: Message); +pub trait Publisher: Send + Sync { + async fn publish(&self, message: Message); } -pub type PublishRef = Arc; +pub type PublisherRef = Arc; -/// The default implementation of [Publish] -pub struct DefaultPublish { - subscribe_manager: Arc, +/// The default implementation of [Publisher] +pub struct DefaultPublisher { + subscription_manager: Arc, _transport: PhantomData, } -impl DefaultPublish { - pub fn new(subscribe_manager: Arc) -> Self { +impl DefaultPublisher { + pub fn new(subscription_manager: Arc) -> Self { Self { - subscribe_manager, + subscription_manager, _transport: PhantomData, } } } #[async_trait::async_trait] -impl Publish for DefaultPublish +impl Publisher for DefaultPublisher where - M: SubscribeManager, + M: SubscriptionManager, T: Transport + Debug, { - async fn send_msg(&self, message: Message) { - let sub_list = self - .subscribe_manager + async fn publish(&self, message: Message) { + let subscribers = self + .subscription_manager .subscribers_by_topic(&message.topic()); - for sub in sub_list { - if sub.transport_msg(message.clone()).await.is_err() { + for subscriber in subscribers { + if subscriber.transport_msg(message.clone()).await.is_err() { // If an error occurs, we consider the subscriber offline, // so un_subscribe here. - let req = UnSubRequest { - subscriber_id: sub.id(), + let req = UnsubscribeRequest { + subscriber_id: subscriber.id(), }; - if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) { - error!(e; "failed to un_subscribe, req: {:?}", req); + if let Err(e) = self.subscription_manager.unsubscribe(req.clone()) { + error!(e; "failed to unsubscribe, req: {:?}", req); } } } diff --git a/src/meta-srv/src/pubsub/subscribe_manager.rs b/src/meta-srv/src/pubsub/subscribe_manager.rs index d1fa1f2c73..58b57002e7 100644 --- a/src/meta-srv/src/pubsub/subscribe_manager.rs +++ b/src/meta-srv/src/pubsub/subscribe_manager.rs @@ -21,94 +21,92 @@ use tokio::sync::mpsc::Sender; use crate::error::Result; use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport}; -pub trait SubscribeQuery: Send + Sync { +pub trait SubscriptionQuery: Send + Sync { fn subscribers_by_topic(&self, topic: &Topic) -> Vec>; } -pub trait SubscribeManager: SubscribeQuery { - fn subscribe(&self, req: AddSubRequest) -> Result<()>; +pub trait SubscriptionManager: SubscriptionQuery { + fn subscribe(&self, req: SubscribeRequest) -> Result<()>; - fn un_subscribe(&self, req: UnSubRequest) -> Result<()>; + fn unsubscribe(&self, req: UnsubscribeRequest) -> Result<()>; - fn un_subscribe_all(&self) -> Result<()>; + fn unsubscribe_all(&self) -> Result<()>; } -pub type SubscribeManagerRef = Arc>>; +pub type SubscriptionManagerRef = Arc>>; -pub struct AddSubRequest { - pub topic_list: Vec, +pub struct SubscribeRequest { + pub topics: Vec, pub subscriber: Subscriber, } #[derive(Debug, Clone)] -pub struct UnSubRequest { +pub struct UnsubscribeRequest { pub subscriber_id: u32, } + pub struct DefaultSubscribeManager { - topic2sub: DashMap>>>, + topic_to_subscribers: DashMap>>>, } impl Default for DefaultSubscribeManager { fn default() -> Self { Self { - topic2sub: DashMap::new(), + topic_to_subscribers: DashMap::new(), } } } -impl SubscribeQuery for DefaultSubscribeManager +impl SubscriptionQuery for DefaultSubscribeManager where T: Transport, { fn subscribers_by_topic(&self, topic: &Topic) -> Vec> { - self.topic2sub + self.topic_to_subscribers .get(topic) .map(|list_ref| list_ref.clone()) .unwrap_or_default() } } -impl SubscribeManager for DefaultSubscribeManager +impl SubscriptionManager for DefaultSubscribeManager where T: Transport, { - fn subscribe(&self, req: AddSubRequest) -> Result<()> { - let AddSubRequest { - topic_list, - subscriber, - } = req; + fn subscribe(&self, req: SubscribeRequest) -> Result<()> { + let SubscribeRequest { topics, subscriber } = req; info!( - "Add a subscription, subscriber_id: {}, subscriber_name: {}, topic list: {:?}", + "Add a subscriber, subscriber_id: {}, subscriber_name: {}, topics: {:?}", subscriber.id(), subscriber.name(), - topic_list + topics ); let subscriber = Arc::new(subscriber); - for topic in topic_list { - let mut entry = self.topic2sub.entry(topic).or_default(); + for topic in topics { + let mut entry = self.topic_to_subscribers.entry(topic).or_default(); entry.push(subscriber.clone()); } Ok(()) } - fn un_subscribe(&self, req: UnSubRequest) -> Result<()> { - let UnSubRequest { subscriber_id } = req; + fn unsubscribe(&self, req: UnsubscribeRequest) -> Result<()> { + let UnsubscribeRequest { subscriber_id } = req; - info!("Add a un_subscription, subscriber_id: {}", subscriber_id); + info!("Remove a subscriber, subscriber_id: {}", subscriber_id); - for mut sub_list in self.topic2sub.iter_mut() { - sub_list.retain(|subscriber| subscriber.id() != subscriber_id) + for mut subscribers in self.topic_to_subscribers.iter_mut() { + subscribers.retain(|subscriber| subscriber.id() != subscriber_id) } Ok(()) } - fn un_subscribe_all(&self) -> Result<()> { - self.topic2sub.clear(); + fn unsubscribe_all(&self) -> Result<()> { + self.topic_to_subscribers.clear(); Ok(()) } diff --git a/src/meta-srv/src/pubsub/tests.rs b/src/meta-srv/src/pubsub/tests.rs index 41f1e3e95d..6cf5b47c9a 100644 --- a/src/meta-srv/src/pubsub/tests.rs +++ b/src/meta-srv/src/pubsub/tests.rs @@ -19,8 +19,8 @@ use tokio::sync::mpsc::{Receiver, Sender}; use super::DefaultSubscribeManager; use crate::pubsub::{ - AddSubRequest, DefaultPublish, Message, Publish, SubscribeManager, SubscribeQuery, Subscriber, - Topic, UnSubRequest, + DefaultPublisher, Message, Publisher, SubscribeRequest, Subscriber, SubscriptionManager, + SubscriptionQuery, Topic, UnsubscribeRequest, }; #[tokio::test] @@ -28,15 +28,15 @@ async fn test_pubsub() { let manager = Arc::new(DefaultSubscribeManager::default()); let (subscriber1, mut rx1) = mock_subscriber(1, "tidigong"); - let req = AddSubRequest { - topic_list: vec![Topic::Heartbeat], + let req = SubscribeRequest { + topics: vec![Topic::Heartbeat], subscriber: subscriber1, }; manager.subscribe(req).unwrap(); let (subscriber2, mut rx2) = mock_subscriber(2, "gcrm"); - let req = AddSubRequest { - topic_list: vec![Topic::Heartbeat], + let req = SubscribeRequest { + topics: vec![Topic::Heartbeat], subscriber: subscriber2, }; manager.subscribe(req).unwrap(); @@ -44,10 +44,10 @@ async fn test_pubsub() { let manager_clone = manager.clone(); let message_number: usize = 5; tokio::spawn(async move { - let publisher: DefaultPublish>, Sender> = - DefaultPublish::new(manager_clone); + let publisher: DefaultPublisher>, Sender> = + DefaultPublisher::new(manager_clone); for _ in 0..message_number { - publisher.send_msg(mock_message()).await; + publisher.publish(mock_message()).await; } }); @@ -59,12 +59,12 @@ async fn test_pubsub() { } manager - .un_subscribe(UnSubRequest { subscriber_id: 1 }) + .unsubscribe(UnsubscribeRequest { subscriber_id: 1 }) .unwrap(); let may_msg = rx1.recv().await; assert!(may_msg.is_none()); - manager.un_subscribe_all().unwrap(); + manager.unsubscribe_all().unwrap(); let may_msg = rx2.recv().await; assert!(may_msg.is_none()); } @@ -74,15 +74,15 @@ async fn test_subscriber_disconnect() { let manager = Arc::new(DefaultSubscribeManager::default()); let (subscriber1, rx1) = mock_subscriber(1, "tidigong"); - let req = AddSubRequest { - topic_list: vec![Topic::Heartbeat], + let req = SubscribeRequest { + topics: vec![Topic::Heartbeat], subscriber: subscriber1, }; manager.subscribe(req).unwrap(); let (subscriber2, rx2) = mock_subscriber(2, "gcrm"); - let req = AddSubRequest { - topic_list: vec![Topic::Heartbeat], + let req = SubscribeRequest { + topics: vec![Topic::Heartbeat], subscriber: subscriber2, }; manager.subscribe(req).unwrap(); @@ -90,10 +90,10 @@ async fn test_subscriber_disconnect() { let manager_clone = manager.clone(); let message_number: usize = 5; let join = tokio::spawn(async move { - let publisher: DefaultPublish>, Sender> = - DefaultPublish::new(manager_clone); + let publisher: DefaultPublisher>, Sender> = + DefaultPublisher::new(manager_clone); for _ in 0..message_number { - publisher.send_msg(mock_message()).await; + publisher.publish(mock_message()).await; } }); @@ -118,8 +118,8 @@ fn test_sub_manager() { let manager = DefaultSubscribeManager::default(); let subscriber = mock_subscriber(1, "tidigong").0; - let req = AddSubRequest { - topic_list: vec![Topic::Heartbeat], + let req = SubscribeRequest { + topics: vec![Topic::Heartbeat], subscriber, }; manager.subscribe(req).unwrap(); @@ -127,21 +127,21 @@ fn test_sub_manager() { assert_eq!(1, ret.len()); let subscriber = mock_subscriber(2, "gcrm").0; - let req = AddSubRequest { - topic_list: vec![Topic::Heartbeat], + let req = SubscribeRequest { + topics: vec![Topic::Heartbeat], subscriber, }; manager.subscribe(req).unwrap(); let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(2, ret.len()); - let req = UnSubRequest { subscriber_id: 1 }; - manager.un_subscribe(req).unwrap(); + let req = UnsubscribeRequest { subscriber_id: 1 }; + manager.unsubscribe(req).unwrap(); let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(1, ret.len()); - let req = UnSubRequest { subscriber_id: 2 }; - manager.un_subscribe(req).unwrap(); + let req = UnsubscribeRequest { subscriber_id: 2 }; + manager.unsubscribe(req).unwrap(); let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(0, ret.len()); }