Switch to broadcast channel and parking_lot rwlock.

This commit is contained in:
Arseny Sher
2022-11-21 09:31:46 +04:00
parent 7850fe4fa6
commit 7ffd740b5f
5 changed files with 141 additions and 257 deletions

1
Cargo.lock generated
View File

@@ -1919,6 +1919,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost 0.11.2",
"tokio",
"tokio-stream",

View File

@@ -22,6 +22,7 @@ git-version = "0.3.5"
humantime = "2.1.0"
hyper = {version = "0.14.14", features = ["full"]}
once_cell = "1.13.0"
parking_lot = "0.12"
prost = "0.11"
tonic = "0.8"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }

View File

@@ -165,7 +165,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
for _i in 0..args.num_pubs {
// let c = Some(c.clone());
tokio::spawn(publish(None, args.num_subs as u64));
let c = None;
tokio::spawn(publish(c, args.num_subs as u64));
}
h.await?;

View File

@@ -20,29 +20,26 @@ use futures_util::StreamExt;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, StatusCode};
use metrics::{Encoder, TextEncoder};
use neon_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use neon_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
use std::collections::hash_map::Entry;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tonic::codegen::Service;
use tonic::Code;
use tonic::{Request, Response, Status};
use tracing::*;
use metrics::{Encoder, TextEncoder};
use neon_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use neon_broker::neon_broker_proto::neon_broker_server::{NeonBroker, NeonBrokerServer};
use neon_broker::neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use neon_broker::neon_broker_proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use neon_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
use utils::id::TenantTimelineId;
use utils::logging::{self, LogFormat};
use utils::project_git_version;
@@ -90,234 +87,156 @@ impl SubscriptionKey {
}
}
// Subscriber id + tx end of the channel for messages to it.
#[derive(Clone)]
struct SubSender(SubId, Sender<SafekeeperTimelineInfo>);
impl fmt::Debug for SubSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Subscription id {}", self.0)
}
// Channel to timeline subscribers.
struct ChanToTimelineSub {
chan: broadcast::Sender<SafekeeperTimelineInfo>,
// Tracked separately to know when delete the shmem entry. receiver_count()
// is unhandy for that as unregistering and dropping the receiver side
// happens at different moments.
num_subscribers: u64,
}
// Announcements subscriber sends to publisher(s) asking it to stream to the
// provided channel, or forget about it, releasing memory.
#[derive(Clone)]
enum SubAnnounce {
/// Add subscription to all timelines
AddAll(Sender<SafekeeperTimelineInfo>),
/// Add subscription to the specific timeline
AddTimeline(TenantTimelineId, SubSender),
/// Remove subscription to the specific timeline
RemoveTimeline(TenantTimelineId, SubId),
// RemoveAll is not needed as publisher will notice closed channel while
// trying to send the next message.
}
#[derive(Default)]
struct SharedState {
// Registered publishers. They sit on the rx end of these channels and
// receive through it tx handles of chans to subscribers.
//
// Note: publishers don't identify which keys they publish, so each
// publisher will receive channels to all subs and filter them before sending.
pub_txs: HashMap<PubId, Sender<SubAnnounce>>,
next_pub_id: PubId,
// Registered subscribers -- when publisher joins it walks over them,
// collecting txs to send messages.
subs_to_all: HashMap<SubId, Sender<SafekeeperTimelineInfo>>,
subs_to_timelines: HashMap<TenantTimelineId, Vec<SubSender>>,
num_subs_to_timelines: i64,
num_pubs: i64,
next_sub_id: SubId,
}
// Utility func to remove subscription from the map
fn remove_sub(
subs_to_timelines: &mut HashMap<TenantTimelineId, Vec<SubSender>>,
ttid: &TenantTimelineId,
sub_id: SubId,
) {
if let Some(subsenders) = subs_to_timelines.get_mut(ttid) {
subsenders.retain(|ss| ss.0 != sub_id);
if subsenders.is_empty() {
subs_to_timelines.remove(ttid);
}
}
// Note that subscription might be not here if subscriber task was aborted
// earlier than it managed to notify publisher about itself.
num_subs_to_timelines: i64,
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
num_subs_to_all: i64,
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
}
impl SharedState {
// Register new publisher.
pub fn register_publisher(&mut self, announce_tx: Sender<SubAnnounce>) -> PubId {
let pub_id = self.next_pub_id;
self.next_pub_id += 1;
assert!(!self.pub_txs.contains_key(&pub_id));
self.pub_txs.insert(pub_id, announce_tx);
NUM_PUBS.set(self.pub_txs.len() as i64);
pub_id
pub fn new(chan_size: usize) -> Self {
SharedState {
next_pub_id: 0,
num_pubs: 0,
next_sub_id: 0,
num_subs_to_timelines: 0,
chans_to_timeline_subs: HashMap::new(),
num_subs_to_all: 0,
chan_to_all_subs: broadcast::channel(chan_size).0,
}
}
pub fn unregister_publisher(&mut self, pub_id: PubId) {
assert!(self.pub_txs.contains_key(&pub_id));
self.pub_txs.remove(&pub_id);
NUM_PUBS.set(self.pub_txs.len() as i64);
// Register new publisher.
pub fn register_publisher(&mut self, registry: Registry) -> Publisher {
let pub_id = self.next_pub_id;
self.next_pub_id += 1;
self.num_pubs += 1;
NUM_PUBS.set(self.num_pubs);
Publisher {
id: pub_id,
registry,
}
}
// Unregister publisher.
pub fn unregister_publisher(&mut self) {
self.num_pubs -= 1;
NUM_PUBS.set(self.num_pubs);
}
// Register new subscriber.
// Returns list of channels through which existing publishers must be notified
// about new subscriber; we can't do it here due to risk of deadlock.
pub fn register_subscriber(
&mut self,
sub_key: SubscriptionKey,
sub_tx: Sender<SafekeeperTimelineInfo>,
) -> (SubId, Vec<Sender<SubAnnounce>>, SubAnnounce) {
registry: Registry,
chan_size: usize,
) -> Subscriber {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
let announce = match sub_key {
let sub_rx = match sub_key {
SubscriptionKey::All => {
assert!(!self.subs_to_all.contains_key(&sub_id));
self.subs_to_all.insert(sub_id, sub_tx.clone());
NUM_SUBS_ALL.set(self.subs_to_all.len() as i64);
SubAnnounce::AddAll(sub_tx)
self.num_subs_to_all += 1;
NUM_SUBS_ALL.set(self.num_subs_to_all);
self.chan_to_all_subs.subscribe()
}
SubscriptionKey::Timeline(ttid) => {
self.subs_to_timelines
.entry(ttid)
.or_default()
.push(SubSender(sub_id, sub_tx.clone()));
self.num_subs_to_timelines += 1;
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
SubAnnounce::AddTimeline(ttid, SubSender(sub_id, sub_tx))
// Create new broadcast channel for this key, or subscriber to
// the existing one.
let chan_to_timeline_sub =
self.chans_to_timeline_subs
.entry(ttid)
.or_insert(ChanToTimelineSub {
chan: broadcast::channel(chan_size).0,
num_subscribers: 0,
});
chan_to_timeline_sub.num_subscribers += 1;
chan_to_timeline_sub.chan.subscribe()
}
};
// Collect existing publishers to notify them after lock is released;
// TODO: the probability of channels being full here is tiny (publisher
// always blocks listening chan), we can try sending first and resort to
// cloning if needed.
//
// Deadlock is possible only if publisher tries to access shared state
// during its lifetime, i.e. we add maintenance of set of published
// tlis. Otherwise we can just await here (but lock must be replaced
// with Tokio one).
//
// We could also just error out if some chan is full, but that needs
// cleanup of incompleted job, and notifying publishers when unregistering
// is mandatory anyway.
(sub_id, self.pub_txs.values().cloned().collect(), announce)
Subscriber {
id: sub_id,
key: sub_key,
sub_rx,
registry,
}
}
// Unregister the subscriber. Similar to register_subscriber, returns list
// of channels through which publishers must be notified about the removal.
pub fn unregister_subscriber(
&mut self,
sub_id: SubId,
sub_key: SubscriptionKey,
) -> Option<(Vec<Sender<SubAnnounce>>, SubAnnounce)> {
// We need to notify existing publishers only about per timeline
// subscriptions, 'all' kind is detected on its own through closed
// channels.
let announce = match sub_key {
// Unregister the subscriber.
pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
match sub_key {
SubscriptionKey::All => {
assert!(self.subs_to_all.contains_key(&sub_id));
self.subs_to_all.remove(&sub_id);
NUM_SUBS_ALL.set(self.subs_to_all.len() as i64);
None
self.num_subs_to_all -= 1;
NUM_SUBS_ALL.set(self.num_subs_to_all);
}
SubscriptionKey::Timeline(ref ttid) => {
remove_sub(&mut self.subs_to_timelines, ttid, sub_id);
SubscriptionKey::Timeline(ttid) => {
self.num_subs_to_timelines -= 1;
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
Some(SubAnnounce::RemoveTimeline(*ttid, sub_id))
// Remove from the map, destroying the channel, if we are the
// last subscriber to this timeline.
// Missing entry is a bug; we must have registered.
let chan_to_timeline_sub = self.chans_to_timeline_subs.get_mut(&ttid).unwrap();
chan_to_timeline_sub.num_subscribers -= 1;
if chan_to_timeline_sub.num_subscribers == 0 {
self.chans_to_timeline_subs.remove(&ttid);
}
}
};
announce.map(|a| (self.pub_txs.values().cloned().collect(), a))
}
}
}
// SharedState wrapper for post-locking operations (sending to pub_tx chans).
// SharedState wrapper.
#[derive(Clone)]
struct Registry {
shared_state: Arc<Mutex<SharedState>>,
shared_state: Arc<RwLock<SharedState>>,
chan_size: usize,
}
const PUB_NOTIFY_CHAN_SIZE: usize = 128;
impl Registry {
// Register new publisher in shared state.
pub fn register_publisher(&self) -> Publisher {
let (announce_tx, announce_rx) = mpsc::channel(PUB_NOTIFY_CHAN_SIZE);
let mut ss = self.shared_state.lock().unwrap();
let id = ss.register_publisher(announce_tx);
let (subs_to_all, subs_to_timelines) = (
ss.subs_to_all.values().cloned().collect(),
ss.subs_to_timelines.clone(),
);
drop(ss);
trace!("registered publisher {}", id);
Publisher {
id,
announce_rx: announce_rx.into(),
subs_to_all,
subs_to_timelines,
registry: self.clone(),
}
let publisher = self.shared_state.write().register_publisher(self.clone());
trace!("registered publisher {}", publisher.id);
publisher
}
pub fn unregister_publisher(&self, publisher: &Publisher) {
self.shared_state
.lock()
.unwrap()
.unregister_publisher(publisher.id);
self.shared_state.write().unregister_publisher();
trace!("unregistered publisher {}", publisher.id);
}
// Register new subscriber in shared state.
pub async fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber {
let (tx, rx) = mpsc::channel(self.chan_size);
let id;
let mut pub_txs;
let announce;
{
let mut ss = self.shared_state.lock().unwrap();
(id, pub_txs, announce) = ss.register_subscriber(sub_key, tx);
}
// Note: it is important to create Subscriber before .await. If client
// disconnects during await, which would terminate the Future we still
// need to run Subscriber's drop() which will unregister it from the
// shared state.
let subscriber = Subscriber {
id,
key: sub_key,
sub_rx: rx,
registry: self.clone(),
};
// Notify existing publishers about new subscriber.
for pub_tx in pub_txs.iter_mut() {
// Closed channel is fine; it means publisher has gone.
pub_tx.send(announce.clone()).await.ok();
}
trace!("registered subscriber {}", id);
pub fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber {
let subscriber =
self.shared_state
.write()
.register_subscriber(sub_key, self.clone(), self.chan_size);
trace!("registered subscriber {}", subscriber.id);
subscriber
}
// Unregister the subscriber
pub fn unregister_subscriber(&self, sub: &Subscriber) {
let mut ss = self.shared_state.lock().unwrap();
let announce_pack = ss.unregister_subscriber(sub.id, sub.key);
drop(ss);
// Notify publishers about the removal. Apart from wanting to do it
// outside lock, here we also spin a task as Drop impl can't be async.
if let Some((mut pub_txs, announce)) = announce_pack {
tokio::spawn(async move {
for pub_tx in pub_txs.iter_mut() {
// Closed channel is fine; it means publisher has gone.
pub_tx.send(announce.clone()).await.ok();
}
});
}
trace!("unregistered subscriber {}", sub.id);
pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
self.shared_state
.write()
.unregister_subscriber(subscriber.key);
trace!("unregistered subscriber {}", subscriber.id);
}
}
@@ -326,7 +245,7 @@ struct Subscriber {
id: SubId,
key: SubscriptionKey,
// Subscriber receives messages from publishers here.
sub_rx: Receiver<SafekeeperTimelineInfo>,
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
// to unregister itself from shared state in Drop
registry: Registry,
}
@@ -340,12 +259,6 @@ impl Drop for Subscriber {
// Private publisher state
struct Publisher {
id: PubId,
// new subscribers request to send (or stop sending) msgs them here.
// It could be just Receiver, but weirdly it doesn't implement futures_core Stream directly.
announce_rx: ReceiverStream<SubAnnounce>,
subs_to_all: Vec<Sender<SafekeeperTimelineInfo>>,
subs_to_timelines: HashMap<TenantTimelineId, Vec<SubSender>>,
// to unregister itself from shared state in Drop
registry: Registry,
}
@@ -353,59 +266,22 @@ impl Publisher {
// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let mut cleanup_subs_to_all = false;
for sub in self.subs_to_all.iter() {
match sub.try_send(msg.clone()) {
Err(TrySendError::Full(_)) => {
warn!("dropping message, channel is full");
}
Err(TrySendError::Closed(_)) => {
cleanup_subs_to_all = true;
}
_ => (),
}
}
// some channels got closed (subscriber gone), remove them
if cleanup_subs_to_all {
self.subs_to_all.retain(|tx| !tx.is_closed());
}
let shared_state = self.registry.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers
let ttid =
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
if let Some(subs) = self.subs_to_timelines.get(&ttid) {
for tx in subs.iter().map(|sub_sender| &sub_sender.1) {
if let Err(TrySendError::Full(_)) = tx.try_send(msg.clone()) {
warn!("dropping message, channel is full");
}
// closed channel is ignored here; we will be notified and remove it soon
}
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as rx is destroyed after removing from the
// map the last subscriber.
subs.chan.send(msg.clone()).unwrap();
}
Ok(())
}
// Add/remove subscriber according to sub_announce.
pub fn update_sub(&mut self, sub_announce: SubAnnounce) {
match sub_announce {
SubAnnounce::AddAll(tx) => self.subs_to_all.push(tx),
SubAnnounce::AddTimeline(ttid, sub_sender) => {
match self.subs_to_timelines.entry(ttid) {
Entry::Occupied(mut o) => {
let subsenders = o.get_mut();
subsenders.push(sub_sender);
}
Entry::Vacant(v) => {
v.insert(vec![sub_sender]);
}
}
}
SubAnnounce::RemoveTimeline(ref ttid, sub_id) => {
remove_sub(&mut self.subs_to_timelines, ttid, sub_id);
}
}
}
}
impl Drop for Publisher {
@@ -429,17 +305,10 @@ impl NeonBroker for NeonBrokerImpl {
let mut stream = request.into_inner();
loop {
select! {
msg = stream.next() => {
match msg {
Some(Ok(msg)) => publisher.send_msg(&msg)?,
Some(Err(e)) => return Err(e), // grpc error from the stream
None => break // closed stream
}
}
Some(announce) = publisher.announce_rx.next() => {
publisher.update_sub(announce);
}
match stream.next().await {
Some(Ok(msg)) => publisher.send_msg(&msg)?,
Some(Err(e)) => return Err(e), // grpc error from the stream
None => break, // closed stream
}
}
@@ -458,13 +327,22 @@ impl NeonBroker for NeonBrokerImpl {
.subscription_key
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
let mut subscriber = self.registry.register_subscriber(sub_key).await;
let mut subscriber = self.registry.register_subscriber(sub_key);
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
while let Some(info) = subscriber.sub_rx.recv().await {
yield info
loop {
match subscriber.sub_rx.recv().await {
Ok(info) => yield info,
Err(RecvError::Lagged(_skipped_msg)) => {
// warn!("dropped {} messages, channel is full", skipped_msg);
}
Err(RecvError::Closed) => {
// can't happen, we never drop the channel while there is a subscriber
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
}
}
}
};
Ok(Response::new(
@@ -506,7 +384,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("version: {GIT_VERSION}");
let registry = Registry {
shared_state: Arc::new(Mutex::new(SharedState::default())),
shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))),
chan_size: args.chan_size,
};
let neon_broker_impl = NeonBrokerImpl {
@@ -560,7 +438,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
mod tests {
use super::*;
use neon_broker::neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::broadcast::error::TryRecvError;
use utils::id::{TenantId, TimelineId};
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
@@ -590,7 +468,7 @@ mod tests {
#[tokio::test]
async fn test_registry() {
let registry = Registry {
shared_state: Arc::new(Mutex::new(SharedState::default())),
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
chan_size: 16,
};
@@ -600,8 +478,8 @@ mod tests {
timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
};
let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
let mut subscriber_2 = registry.register_subscriber(sub_key_2).await;
let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All).await;
let mut subscriber_2 = registry.register_subscriber(sub_key_2);
let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All);
// send two messages with different keys
let msg_1 = msg(tli_from_u64(1));

View File

@@ -17,6 +17,9 @@ pub static NUM_SUBS_TIMELINE: Lazy<IntGauge> = Lazy::new(|| {
});
pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!("broker_all_keys_active_subscribers", "Number of subsciptions to all keys")
.expect("Failed to register metric")
register_int_gauge!(
"broker_all_keys_active_subscribers",
"Number of subsciptions to all keys"
)
.expect("Failed to register metric")
});