diff --git a/Cargo.lock b/Cargo.lock index b690b6ec56..ffb9bb124c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1919,6 +1919,7 @@ dependencies = [ "hyper", "metrics", "once_cell", + "parking_lot 0.12.1", "prost 0.11.2", "tokio", "tokio-stream", diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 209f13fe50..395ab8a275 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -22,6 +22,7 @@ git-version = "0.3.5" humantime = "2.1.0" hyper = {version = "0.14.14", features = ["full"]} once_cell = "1.13.0" +parking_lot = "0.12" prost = "0.11" tonic = "0.8" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/broker/src/bin/bench.rs b/broker/src/bin/bench.rs index 0e518bc767..d3df85ce90 100644 --- a/broker/src/bin/bench.rs +++ b/broker/src/bin/bench.rs @@ -165,7 +165,8 @@ async fn main() -> Result<(), Box> { } for _i in 0..args.num_pubs { // let c = Some(c.clone()); - tokio::spawn(publish(None, args.num_subs as u64)); + let c = None; + tokio::spawn(publish(c, args.num_subs as u64)); } h.await?; diff --git a/broker/src/bin/neon_broker.rs b/broker/src/bin/neon_broker.rs index 80f3e23b37..c2cea84c06 100644 --- a/broker/src/bin/neon_broker.rs +++ b/broker/src/bin/neon_broker.rs @@ -20,29 +20,26 @@ use futures_util::StreamExt; use hyper::header::CONTENT_TYPE; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, StatusCode}; -use metrics::{Encoder, TextEncoder}; -use neon_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE}; -use neon_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR}; -use std::collections::hash_map::Entry; +use parking_lot::RwLock; use std::collections::HashMap; use std::convert::Infallible; -use std::fmt; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; -use tokio::select; -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio_stream::wrappers::ReceiverStream; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; use tonic::codegen::Service; use tonic::Code; use tonic::{Request, Response, Status}; use tracing::*; +use metrics::{Encoder, TextEncoder}; +use neon_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE}; use neon_broker::neon_broker_proto::neon_broker_server::{NeonBroker, NeonBrokerServer}; use neon_broker::neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; use neon_broker::neon_broker_proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; +use neon_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR}; use utils::id::TenantTimelineId; use utils::logging::{self, LogFormat}; use utils::project_git_version; @@ -90,234 +87,156 @@ impl SubscriptionKey { } } -// Subscriber id + tx end of the channel for messages to it. -#[derive(Clone)] -struct SubSender(SubId, Sender); - -impl fmt::Debug for SubSender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Subscription id {}", self.0) - } +// Channel to timeline subscribers. +struct ChanToTimelineSub { + chan: broadcast::Sender, + // Tracked separately to know when delete the shmem entry. receiver_count() + // is unhandy for that as unregistering and dropping the receiver side + // happens at different moments. + num_subscribers: u64, } -// Announcements subscriber sends to publisher(s) asking it to stream to the -// provided channel, or forget about it, releasing memory. -#[derive(Clone)] -enum SubAnnounce { - /// Add subscription to all timelines - AddAll(Sender), - /// Add subscription to the specific timeline - AddTimeline(TenantTimelineId, SubSender), - /// Remove subscription to the specific timeline - RemoveTimeline(TenantTimelineId, SubId), - // RemoveAll is not needed as publisher will notice closed channel while - // trying to send the next message. -} - -#[derive(Default)] struct SharedState { - // Registered publishers. They sit on the rx end of these channels and - // receive through it tx handles of chans to subscribers. - // - // Note: publishers don't identify which keys they publish, so each - // publisher will receive channels to all subs and filter them before sending. - pub_txs: HashMap>, next_pub_id: PubId, - // Registered subscribers -- when publisher joins it walks over them, - // collecting txs to send messages. - subs_to_all: HashMap>, - subs_to_timelines: HashMap>, - num_subs_to_timelines: i64, + num_pubs: i64, next_sub_id: SubId, -} - -// Utility func to remove subscription from the map -fn remove_sub( - subs_to_timelines: &mut HashMap>, - ttid: &TenantTimelineId, - sub_id: SubId, -) { - if let Some(subsenders) = subs_to_timelines.get_mut(ttid) { - subsenders.retain(|ss| ss.0 != sub_id); - if subsenders.is_empty() { - subs_to_timelines.remove(ttid); - } - } - // Note that subscription might be not here if subscriber task was aborted - // earlier than it managed to notify publisher about itself. + num_subs_to_timelines: i64, + chans_to_timeline_subs: HashMap, + num_subs_to_all: i64, + chan_to_all_subs: broadcast::Sender, } impl SharedState { - // Register new publisher. - pub fn register_publisher(&mut self, announce_tx: Sender) -> PubId { - let pub_id = self.next_pub_id; - self.next_pub_id += 1; - assert!(!self.pub_txs.contains_key(&pub_id)); - self.pub_txs.insert(pub_id, announce_tx); - NUM_PUBS.set(self.pub_txs.len() as i64); - pub_id + pub fn new(chan_size: usize) -> Self { + SharedState { + next_pub_id: 0, + num_pubs: 0, + next_sub_id: 0, + num_subs_to_timelines: 0, + chans_to_timeline_subs: HashMap::new(), + num_subs_to_all: 0, + chan_to_all_subs: broadcast::channel(chan_size).0, + } } - pub fn unregister_publisher(&mut self, pub_id: PubId) { - assert!(self.pub_txs.contains_key(&pub_id)); - self.pub_txs.remove(&pub_id); - NUM_PUBS.set(self.pub_txs.len() as i64); + // Register new publisher. + pub fn register_publisher(&mut self, registry: Registry) -> Publisher { + let pub_id = self.next_pub_id; + self.next_pub_id += 1; + self.num_pubs += 1; + NUM_PUBS.set(self.num_pubs); + Publisher { + id: pub_id, + registry, + } + } + + // Unregister publisher. + pub fn unregister_publisher(&mut self) { + self.num_pubs -= 1; + NUM_PUBS.set(self.num_pubs); } // Register new subscriber. - // Returns list of channels through which existing publishers must be notified - // about new subscriber; we can't do it here due to risk of deadlock. pub fn register_subscriber( &mut self, sub_key: SubscriptionKey, - sub_tx: Sender, - ) -> (SubId, Vec>, SubAnnounce) { + registry: Registry, + chan_size: usize, + ) -> Subscriber { let sub_id = self.next_sub_id; self.next_sub_id += 1; - let announce = match sub_key { + let sub_rx = match sub_key { SubscriptionKey::All => { - assert!(!self.subs_to_all.contains_key(&sub_id)); - self.subs_to_all.insert(sub_id, sub_tx.clone()); - NUM_SUBS_ALL.set(self.subs_to_all.len() as i64); - SubAnnounce::AddAll(sub_tx) + self.num_subs_to_all += 1; + NUM_SUBS_ALL.set(self.num_subs_to_all); + self.chan_to_all_subs.subscribe() } SubscriptionKey::Timeline(ttid) => { - self.subs_to_timelines - .entry(ttid) - .or_default() - .push(SubSender(sub_id, sub_tx.clone())); self.num_subs_to_timelines += 1; NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines); - SubAnnounce::AddTimeline(ttid, SubSender(sub_id, sub_tx)) + // Create new broadcast channel for this key, or subscriber to + // the existing one. + let chan_to_timeline_sub = + self.chans_to_timeline_subs + .entry(ttid) + .or_insert(ChanToTimelineSub { + chan: broadcast::channel(chan_size).0, + num_subscribers: 0, + }); + chan_to_timeline_sub.num_subscribers += 1; + chan_to_timeline_sub.chan.subscribe() } }; - // Collect existing publishers to notify them after lock is released; - // TODO: the probability of channels being full here is tiny (publisher - // always blocks listening chan), we can try sending first and resort to - // cloning if needed. - // - // Deadlock is possible only if publisher tries to access shared state - // during its lifetime, i.e. we add maintenance of set of published - // tlis. Otherwise we can just await here (but lock must be replaced - // with Tokio one). - // - // We could also just error out if some chan is full, but that needs - // cleanup of incompleted job, and notifying publishers when unregistering - // is mandatory anyway. - (sub_id, self.pub_txs.values().cloned().collect(), announce) + Subscriber { + id: sub_id, + key: sub_key, + sub_rx, + registry, + } } - // Unregister the subscriber. Similar to register_subscriber, returns list - // of channels through which publishers must be notified about the removal. - pub fn unregister_subscriber( - &mut self, - sub_id: SubId, - sub_key: SubscriptionKey, - ) -> Option<(Vec>, SubAnnounce)> { - // We need to notify existing publishers only about per timeline - // subscriptions, 'all' kind is detected on its own through closed - // channels. - let announce = match sub_key { + // Unregister the subscriber. + pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) { + match sub_key { SubscriptionKey::All => { - assert!(self.subs_to_all.contains_key(&sub_id)); - self.subs_to_all.remove(&sub_id); - NUM_SUBS_ALL.set(self.subs_to_all.len() as i64); - None + self.num_subs_to_all -= 1; + NUM_SUBS_ALL.set(self.num_subs_to_all); } - SubscriptionKey::Timeline(ref ttid) => { - remove_sub(&mut self.subs_to_timelines, ttid, sub_id); + SubscriptionKey::Timeline(ttid) => { self.num_subs_to_timelines -= 1; NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines); - Some(SubAnnounce::RemoveTimeline(*ttid, sub_id)) + + // Remove from the map, destroying the channel, if we are the + // last subscriber to this timeline. + + // Missing entry is a bug; we must have registered. + let chan_to_timeline_sub = self.chans_to_timeline_subs.get_mut(&ttid).unwrap(); + chan_to_timeline_sub.num_subscribers -= 1; + if chan_to_timeline_sub.num_subscribers == 0 { + self.chans_to_timeline_subs.remove(&ttid); + } } - }; - announce.map(|a| (self.pub_txs.values().cloned().collect(), a)) + } } } -// SharedState wrapper for post-locking operations (sending to pub_tx chans). +// SharedState wrapper. #[derive(Clone)] struct Registry { - shared_state: Arc>, + shared_state: Arc>, chan_size: usize, } -const PUB_NOTIFY_CHAN_SIZE: usize = 128; - impl Registry { // Register new publisher in shared state. pub fn register_publisher(&self) -> Publisher { - let (announce_tx, announce_rx) = mpsc::channel(PUB_NOTIFY_CHAN_SIZE); - let mut ss = self.shared_state.lock().unwrap(); - let id = ss.register_publisher(announce_tx); - let (subs_to_all, subs_to_timelines) = ( - ss.subs_to_all.values().cloned().collect(), - ss.subs_to_timelines.clone(), - ); - drop(ss); - trace!("registered publisher {}", id); - Publisher { - id, - announce_rx: announce_rx.into(), - subs_to_all, - subs_to_timelines, - registry: self.clone(), - } + let publisher = self.shared_state.write().register_publisher(self.clone()); + trace!("registered publisher {}", publisher.id); + publisher } pub fn unregister_publisher(&self, publisher: &Publisher) { - self.shared_state - .lock() - .unwrap() - .unregister_publisher(publisher.id); + self.shared_state.write().unregister_publisher(); trace!("unregistered publisher {}", publisher.id); } // Register new subscriber in shared state. - pub async fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber { - let (tx, rx) = mpsc::channel(self.chan_size); - let id; - let mut pub_txs; - let announce; - { - let mut ss = self.shared_state.lock().unwrap(); - (id, pub_txs, announce) = ss.register_subscriber(sub_key, tx); - } - // Note: it is important to create Subscriber before .await. If client - // disconnects during await, which would terminate the Future we still - // need to run Subscriber's drop() which will unregister it from the - // shared state. - let subscriber = Subscriber { - id, - key: sub_key, - sub_rx: rx, - registry: self.clone(), - }; - // Notify existing publishers about new subscriber. - for pub_tx in pub_txs.iter_mut() { - // Closed channel is fine; it means publisher has gone. - pub_tx.send(announce.clone()).await.ok(); - } - trace!("registered subscriber {}", id); + pub fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber { + let subscriber = + self.shared_state + .write() + .register_subscriber(sub_key, self.clone(), self.chan_size); + trace!("registered subscriber {}", subscriber.id); subscriber } // Unregister the subscriber - pub fn unregister_subscriber(&self, sub: &Subscriber) { - let mut ss = self.shared_state.lock().unwrap(); - let announce_pack = ss.unregister_subscriber(sub.id, sub.key); - drop(ss); - // Notify publishers about the removal. Apart from wanting to do it - // outside lock, here we also spin a task as Drop impl can't be async. - if let Some((mut pub_txs, announce)) = announce_pack { - tokio::spawn(async move { - for pub_tx in pub_txs.iter_mut() { - // Closed channel is fine; it means publisher has gone. - pub_tx.send(announce.clone()).await.ok(); - } - }); - } - trace!("unregistered subscriber {}", sub.id); + pub fn unregister_subscriber(&self, subscriber: &Subscriber) { + self.shared_state + .write() + .unregister_subscriber(subscriber.key); + trace!("unregistered subscriber {}", subscriber.id); } } @@ -326,7 +245,7 @@ struct Subscriber { id: SubId, key: SubscriptionKey, // Subscriber receives messages from publishers here. - sub_rx: Receiver, + sub_rx: broadcast::Receiver, // to unregister itself from shared state in Drop registry: Registry, } @@ -340,12 +259,6 @@ impl Drop for Subscriber { // Private publisher state struct Publisher { id: PubId, - // new subscribers request to send (or stop sending) msgs them here. - // It could be just Receiver, but weirdly it doesn't implement futures_core Stream directly. - announce_rx: ReceiverStream, - subs_to_all: Vec>, - subs_to_timelines: HashMap>, - // to unregister itself from shared state in Drop registry: Registry, } @@ -353,59 +266,22 @@ impl Publisher { // Send msg to relevant subscribers. pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> { // send message to subscribers for everything - let mut cleanup_subs_to_all = false; - for sub in self.subs_to_all.iter() { - match sub.try_send(msg.clone()) { - Err(TrySendError::Full(_)) => { - warn!("dropping message, channel is full"); - } - Err(TrySendError::Closed(_)) => { - cleanup_subs_to_all = true; - } - _ => (), - } - } - // some channels got closed (subscriber gone), remove them - if cleanup_subs_to_all { - self.subs_to_all.retain(|tx| !tx.is_closed()); - } + let shared_state = self.registry.shared_state.read(); + // Err means there is no subscribers, it is fine. + shared_state.chan_to_all_subs.send(msg.clone()).ok(); // send message to per timeline subscribers let ttid = parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| { Status::new(Code::InvalidArgument, "missing tenant_timeline_id") })?)?; - if let Some(subs) = self.subs_to_timelines.get(&ttid) { - for tx in subs.iter().map(|sub_sender| &sub_sender.1) { - if let Err(TrySendError::Full(_)) = tx.try_send(msg.clone()) { - warn!("dropping message, channel is full"); - } - // closed channel is ignored here; we will be notified and remove it soon - } + if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) { + // Err can't happen here, as rx is destroyed after removing from the + // map the last subscriber. + subs.chan.send(msg.clone()).unwrap(); } Ok(()) } - - // Add/remove subscriber according to sub_announce. - pub fn update_sub(&mut self, sub_announce: SubAnnounce) { - match sub_announce { - SubAnnounce::AddAll(tx) => self.subs_to_all.push(tx), - SubAnnounce::AddTimeline(ttid, sub_sender) => { - match self.subs_to_timelines.entry(ttid) { - Entry::Occupied(mut o) => { - let subsenders = o.get_mut(); - subsenders.push(sub_sender); - } - Entry::Vacant(v) => { - v.insert(vec![sub_sender]); - } - } - } - SubAnnounce::RemoveTimeline(ref ttid, sub_id) => { - remove_sub(&mut self.subs_to_timelines, ttid, sub_id); - } - } - } } impl Drop for Publisher { @@ -429,17 +305,10 @@ impl NeonBroker for NeonBrokerImpl { let mut stream = request.into_inner(); loop { - select! { - msg = stream.next() => { - match msg { - Some(Ok(msg)) => publisher.send_msg(&msg)?, - Some(Err(e)) => return Err(e), // grpc error from the stream - None => break // closed stream - } - } - Some(announce) = publisher.announce_rx.next() => { - publisher.update_sub(announce); - } + match stream.next().await { + Some(Ok(msg)) => publisher.send_msg(&msg)?, + Some(Err(e)) => return Err(e), // grpc error from the stream + None => break, // closed stream } } @@ -458,13 +327,22 @@ impl NeonBroker for NeonBrokerImpl { .subscription_key .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?; let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?; - let mut subscriber = self.registry.register_subscriber(sub_key).await; + let mut subscriber = self.registry.register_subscriber(sub_key); // transform rx into stream with item = Result, as method result demands let output = async_stream::try_stream! { - while let Some(info) = subscriber.sub_rx.recv().await { - yield info + loop { + match subscriber.sub_rx.recv().await { + Ok(info) => yield info, + Err(RecvError::Lagged(_skipped_msg)) => { + // warn!("dropped {} messages, channel is full", skipped_msg); + } + Err(RecvError::Closed) => { + // can't happen, we never drop the channel while there is a subscriber + Err(Status::new(Code::Internal, "channel unexpectantly closed"))?; + } } + } }; Ok(Response::new( @@ -506,7 +384,7 @@ async fn main() -> Result<(), Box> { info!("version: {GIT_VERSION}"); let registry = Registry { - shared_state: Arc::new(Mutex::new(SharedState::default())), + shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))), chan_size: args.chan_size, }; let neon_broker_impl = NeonBrokerImpl { @@ -560,7 +438,7 @@ async fn main() -> Result<(), Box> { mod tests { use super::*; use neon_broker::neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId; - use tokio::sync::mpsc::error::TryRecvError; + use tokio::sync::broadcast::error::TryRecvError; use utils::id::{TenantId, TimelineId}; fn msg(timeline_id: Vec) -> SafekeeperTimelineInfo { @@ -590,7 +468,7 @@ mod tests { #[tokio::test] async fn test_registry() { let registry = Registry { - shared_state: Arc::new(Mutex::new(SharedState::default())), + shared_state: Arc::new(RwLock::new(SharedState::new(16))), chan_size: 16, }; @@ -600,8 +478,8 @@ mod tests { timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(), }; let sub_key_2 = SubscriptionKey::Timeline(ttid_2); - let mut subscriber_2 = registry.register_subscriber(sub_key_2).await; - let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All).await; + let mut subscriber_2 = registry.register_subscriber(sub_key_2); + let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All); // send two messages with different keys let msg_1 = msg(tli_from_u64(1)); diff --git a/broker/src/metrics.rs b/broker/src/metrics.rs index 4173b3993e..23718d22cd 100644 --- a/broker/src/metrics.rs +++ b/broker/src/metrics.rs @@ -17,6 +17,9 @@ pub static NUM_SUBS_TIMELINE: Lazy = Lazy::new(|| { }); pub static NUM_SUBS_ALL: Lazy = Lazy::new(|| { - register_int_gauge!("broker_all_keys_active_subscribers", "Number of subsciptions to all keys") - .expect("Failed to register metric") + register_int_gauge!( + "broker_all_keys_active_subscribers", + "Number of subsciptions to all keys" + ) + .expect("Failed to register metric") });