Tonic based broker for SkTimelineInfo.

This commit is contained in:
Arseny Sher
2022-09-16 13:44:28 +03:00
parent 40a56a302a
commit a40e1edbf4
10 changed files with 2107 additions and 27 deletions

207
Cargo.lock generated
View File

@@ -452,13 +452,28 @@ checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"once_cell",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap_derive"
version = "3.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
@@ -789,9 +804,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f39818dcfc97d45b03953c1292efc4e80954e1583c4aa770bac1383e2310a4"
checksum = "3f83d0ebf42c6eafb8d7c52f7e5f2d3003b89c7aa4fd2b79229209459a849af8"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -801,9 +816,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e580d70777c116df50c390d1211993f62d40302881e54d4b79727acb83d0199"
checksum = "07d050484b55975889284352b0ffc2ecbda25c0c55978017c132b29ba0818a86"
dependencies = [
"cc",
"codespan-reporting",
@@ -816,15 +831,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56a46460b88d1cec95112c8c363f0e2c39afdb237f60583b0b36343bf627ea9c"
checksum = "99d2199b00553eda8012dfec8d3b1c75fce747cf27c169a270b3b99e3448ab78"
[[package]]
name = "cxxbridge-macro"
version = "1.0.78"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "747b608fecf06b0d72d440f27acc99288207324b793be2c17991839f3d4995ea"
checksum = "dcb67a6de1f602736dd7eaead0080cf3435df806c61b24b13328db128c58868f"
dependencies = [
"proc-macro2",
"quote",
@@ -1002,11 +1017,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8664f6ea68aba5503d42dd1be786b0f1bd9b7972e7f40208c83ef74db91bf"
dependencies = [
"http",
"prost",
"prost 0.10.4",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tonic 0.7.2",
"tonic-build 0.7.2",
"tower",
"tower-service",
]
@@ -1516,9 +1531,9 @@ dependencies = [
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde6edd6cef363e9359ed3c98ba64590ba9eecba2293eb5a723ab32aee8926aa"
checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca"
dependencies = [
"cxx",
"cxx-build",
@@ -1644,9 +1659,9 @@ dependencies = [
[[package]]
name = "kqueue"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98"
dependencies = [
"kqueue-sys",
"libc",
@@ -1872,6 +1887,22 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae"
[[package]]
name = "neon_broker"
version = "0.1.0"
dependencies = [
"async-stream",
"clap",
"futures-core",
"futures-util",
"prost 0.11.0",
"tokio",
"tokio-stream",
"tonic 0.8.2",
"tonic-build 0.8.2",
"utils",
]
[[package]]
name = "nix"
version = "0.23.1"
@@ -2416,14 +2447,38 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "prettyplease"
version = "0.1.20"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fead41e178796ef8274dc612a7d8ce4c7e10ca35cd2c5b5ad24cac63aeb6c0"
checksum = "c142c0e46b57171fe0c528bee8c5b7569e80f0c17e377cd0e30ea57dbc11bb51"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -2432,9 +2487,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro2"
version = "1.0.46"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b"
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
dependencies = [
"unicode-ident",
]
@@ -2475,7 +2530,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.10.1",
]
[[package]]
name = "prost"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7"
dependencies = [
"bytes",
"prost-derive 0.11.0",
]
[[package]]
@@ -2493,8 +2558,28 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"prost 0.10.4",
"prost-types 0.10.1",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-build"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost 0.11.0",
"prost-types 0.11.1",
"regex",
"tempfile",
"which",
@@ -2513,6 +2598,19 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-derive"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.10.1"
@@ -2520,7 +2618,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
dependencies = [
"bytes",
"prost",
"prost 0.10.4",
]
[[package]]
name = "prost-types"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e"
dependencies = [
"bytes",
"prost 0.11.0",
]
[[package]]
@@ -3755,8 +3863,40 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"prost 0.10.4",
"prost-derive 0.10.1",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.11.0",
"prost-derive 0.11.0",
"tokio",
"tokio-stream",
"tokio-util",
@@ -3775,7 +3915,20 @@ checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-build 0.10.4",
"quote",
"syn",
]
[[package]]
name = "tonic-build"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build 0.11.1",
"quote",
"syn",
]
@@ -4311,7 +4464,7 @@ dependencies = [
"num-bigint",
"num-integer",
"num-traits",
"prost",
"prost 0.10.4",
"rand",
"regex",
"regex-syntax",

View File

@@ -11,6 +11,7 @@ cargo-features = ["named-profiles"]
[workspace]
members = [
"broker",
"compute_tools",
"control_plane",
"pageserver",

1120
broker/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

35
broker/Cargo.toml Normal file
View File

@@ -0,0 +1,35 @@
[package]
name = "neon_broker"
version = "0.1.0"
edition = "2021"
[features]
bench = []
[[bin]]
name = "neon_broker"
path = "src/broker.rs"
[[bin]]
name = "neon_broker_bench"
path = "src/bench.rs"
# build benchmarking binary only if explicitly requested with '--feature bench'
# required-features = ["bench"]
[dependencies]
async-stream = "0.3"
futures-core = "0.3"
futures-util = "0.3"
tonic = "0.8"
prost = "0.11"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
# for exploring with tokio-console
# tokio = { version = "1", features = ["full", "tracing"] }
# console-subscriber = "0.1.8"
tokio-stream = "0.1"
clap = { version = "3.2.17", features = ["derive"] }
utils = { path = "../libs/utils" }
[build-dependencies]
tonic-build = "0.8"

4
broker/build.rs Normal file
View File

@@ -0,0 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/broker.proto")?;
Ok(())
}

38
broker/proto/broker.proto Normal file
View File

@@ -0,0 +1,38 @@
syntax = "proto3";
package neon_broker;
service NeonBroker {
// Subscribe to safekeeper updates.
rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {};
// Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (Empty) {};
}
message SubscribeSafekeeperInfoRequest {
oneof subscription_key {
Empty all = 1; // subscribe to everything
TenantTimelineId tenant_timeline_id = 2; // subscribe to specific timeline
}
}
message SafekeeperTimelineInfo {
uint64 safekeeper_id = 1;
TenantTimelineId tenant_timeline_id = 2;
uint64 last_log_term = 3;
uint64 flush_lsn = 4;
uint64 commit_lsn = 5;
uint64 backup_lsn = 6;
uint64 remote_consistent_lsn = 7;
uint64 peer_horizon_lsn = 8;
string safekeeper_connstr = 9;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}
message Empty {
}

4
broker/readme.md Normal file
View File

@@ -0,0 +1,4 @@
```
cargo build -r -p neon_broker --features bench && target/release/neon_broker
target/release/neon_broker_bench -s 1 -p 1
```

179
broker/src/bench.rs Normal file
View File

@@ -0,0 +1,179 @@
pub mod neon_broker {
tonic::include_proto!("neon_broker");
}
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use clap::Parser;
use neon_broker::neon_broker_client::NeonBrokerClient;
use neon_broker::subscribe_safekeeper_info_request::SubscriptionKey;
use neon_broker::TenantTimelineId as ProtoTenantTimelineId;
use neon_broker::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use tokio::time::{self, sleep};
use tonic::transport::Channel;
use tonic::Request;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// Number of publishers
#[clap(short = 'p', long, value_parser, default_value_t = 1)]
num_pubs: u64,
/// Number of subscribers
#[clap(short = 's', long, value_parser, default_value_t = 1)]
num_subs: u64,
}
async fn progress_reporter(counters: Vec<Arc<AtomicU64>>) {
let mut interval = time::interval(Duration::from_millis(1000));
let mut c_old = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
let mut c_min_old = counters
.iter()
.map(|c| c.load(Ordering::Relaxed))
.min()
.unwrap_or(0);
let mut started_at = None;
let mut skipped: u64 = 0;
loop {
interval.tick().await;
// print!(
// "cnts are {:?}",
// counters
// .iter()
// .map(|c| c.load(Ordering::Relaxed))
// .collect::<Vec<_>>()
// );
let c_new = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
let c_min_new = counters
.iter()
.map(|c| c.load(Ordering::Relaxed))
.min()
.unwrap_or(0);
if c_new > 0 && started_at.is_none() {
started_at = Some(Instant::now());
skipped = c_new;
}
let avg_rps = started_at.map(|s| {
let dur = s.elapsed();
let dur_secs = dur.as_secs() as f64 + (dur.subsec_millis() as f64) / 1000.0;
let avg_rps = (c_new - skipped) as f64 / dur_secs;
(dur, avg_rps)
});
println!(
"sum rps {}, min rps {} total {}, total min {}, duration, avg sum rps {:?}",
c_new - c_old,
c_min_new - c_min_old,
c_new,
c_min_new,
avg_rps
);
c_old = c_new;
c_min_old = c_min_new;
}
}
fn tli_from_u64(i: u64) -> Vec<u8> {
let mut timeline_id = vec![0xFF; 8];
timeline_id.extend_from_slice(&i.to_be_bytes());
timeline_id
}
async fn subscribe(client: Option<NeonBrokerClient<Channel>>, counter: Arc<AtomicU64>, i: u64) {
let mut client = match client {
Some(c) => c,
None => NeonBrokerClient::connect("http://[::1]:50051")
.await
.unwrap(),
};
// let key = SubscriptionKey::All(Empty {});
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(i),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
};
let mut stream = client
.subscribe_safekeeper_info(request)
.await
.unwrap()
.into_inner();
while let Some(_feature) = stream.message().await.unwrap() {
counter.fetch_add(1, Ordering::Relaxed);
// println!("info = {:?}, client {}", _feature, i);
}
}
async fn publish(client: Option<NeonBrokerClient<Channel>>, n_keys: u64) {
let mut client = match client {
Some(c) => c,
None => NeonBrokerClient::connect("http://[::1]:50051")
.await
.unwrap(),
};
let mut counter: u64 = 0;
// create stream producing new values
let outbound = async_stream::stream! {
loop {
let info = SafekeeperTimelineInfo {
safekeeper_id: 1,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: vec![0xFF; 16],
timeline_id: tli_from_u64(counter % n_keys),
}),
last_log_term: 0,
flush_lsn: counter,
commit_lsn: 2,
backup_lsn: 3,
remote_consistent_lsn: 4,
peer_horizon_lsn: 5,
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
};
counter += 1;
// println!("sending info = {:?}", info);
// if counter >= 1000 {
// break;
// }
yield info;
// sleep(Duration::from_millis(100)).await;
}
};
let _response = client
.publish_safekeeper_info(Request::new(outbound))
.await
.unwrap();
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let mut counters = Vec::with_capacity(args.num_subs as usize);
for _ in 0..args.num_subs {
counters.push(Arc::new(AtomicU64::new(0)));
}
let h = tokio::spawn(progress_reporter(counters.clone()));
let c = NeonBrokerClient::connect("http://[::1]:50051")
.await
.unwrap();
for i in 0..args.num_subs {
let c = Some(c.clone());
// let c = None;
tokio::spawn(subscribe(c, counters[i as usize].clone(), i));
}
for _i in 0..args.num_pubs {
// let c = Some(c.clone());
tokio::spawn(publish(None, args.num_subs as u64));
}
h.await?;
Ok(())
}

526
broker/src/broker.rs Normal file
View File

@@ -0,0 +1,526 @@
//! Simple pub-sub based on grpc (tonic) and Tokio mpsc for storage nodes
//! messaging. The main design goal is to avoid central synchronization during
//! normal flow, resorting to it only when pub/sub change happens. Each
//! subscriber holds mpsc for messages it sits on; tx end is sent to existing
//! publishers and saved in shared state for new ones. Publishers maintain
//! locally set of subscribers they stream messages to.
//!
//! Subscriptions to 1) single timeline 2) everything are possible. We could add
//! subscription to set of timelines to save grpc streams, but testing shows
//! many individual streams is also ok.
//!
//! Message is dropped if subscriber can't consume it, not affecting other
//! subscribers.
//!
//! Only safekeeper message is supported, but it is not hard to add something
//! else with templating.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use futures_core::Stream;
use futures_util::StreamExt;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::{select, time};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Code;
use tonic::{transport::Server, Request, Response, Status};
use neon_broker_proto::neon_broker_server::{NeonBroker, NeonBrokerServer};
use neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId;
use neon_broker_proto::{Empty, SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
pub mod neon_broker_proto {
// The string specified here must match the proto package name.
// If you want to have a look at the generated code, it is at path similar to
// target/debug/build/neon_broker-0fde81d03bedc3b2/out/neon_broker.rs
tonic::include_proto!("neon_broker");
}
// Max size of the queue to the subscriber.
const CHAN_SIZE: usize = 256;
type PubId = u64; // id of publisher for registering in maps
type SubId = u64; // id of subscriber for registering in maps
#[derive(Copy, Clone)]
enum SubscriptionKey {
All,
Timeline(TenantTimelineId),
}
impl SubscriptionKey {
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
match key {
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
}
}
}
}
// Subscriber id + tx end of the channel for messages to it.
#[derive(Clone)]
struct SubSender(SubId, Sender<SafekeeperTimelineInfo>);
impl fmt::Debug for SubSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Subscription id {}", self.0)
}
}
// Announcements subscriber sends to publisher(s) asking it to stream to the
// provided channel, or forget about it, releasing memory.
#[derive(Clone)]
enum SubAnnounce {
AddAll(Sender<SafekeeperTimelineInfo>), // add subscription to all timelines
AddTimeline(TenantTimelineId, SubSender), // add subsciption to the specific timeline
RemoveTimeline(TenantTimelineId, SubId), // remove subscription to the specific timeline
// RemoveAll is not needed as publisher will notice closed channel while
// trying to send the next message.
}
struct SharedState {
// Registered publishers. They sit on the rx end of these channels and
// receive through it tx handles of chans to subscribers.
//
// Note: publishers don't identify which keys they publish, so each
// publisher will receive channels to all subs and filter them before sending.
pub_txs: HashMap<PubId, Sender<SubAnnounce>>,
next_pub_id: PubId,
// Registered subscribers -- when publisher joins it walks over them,
// collecting txs to send messages.
subs_to_all: HashMap<SubId, Sender<SafekeeperTimelineInfo>>,
subs_to_timelines: HashMap<TenantTimelineId, Vec<SubSender>>,
next_sub_id: SubId,
}
// Utility func to remove subscription from the map
fn remove_sub(
subs_to_timelines: &mut HashMap<TenantTimelineId, Vec<SubSender>>,
ttid: &TenantTimelineId,
sub_id: SubId,
) {
if let Some(subsenders) = subs_to_timelines.get_mut(&ttid) {
subsenders.retain(|ss| ss.0 != sub_id);
if subsenders.len() == 0 {
subs_to_timelines.remove(&ttid);
}
}
// Note that subscription might be not here if subscriber task was aborted
// earlier than it managed to notify publisher about itself.
}
impl SharedState {
// Register new publisher.
pub fn register_publisher(&mut self, announce_tx: Sender<SubAnnounce>) -> PubId {
let pub_id = self.next_pub_id;
self.next_pub_id += 1;
assert!(!self.pub_txs.contains_key(&pub_id));
self.pub_txs.insert(pub_id, announce_tx);
pub_id
}
pub fn unregister_publisher(&mut self, pub_id: PubId) {
assert!(self.pub_txs.contains_key(&pub_id));
self.pub_txs.remove(&pub_id);
}
// Register new subscriber.
// Returns list of channels through which existing publishers must be notified
// about new subscriber; we can't do it here due to risk of deadlock.
pub fn register_subscriber(
&mut self,
sub_key: SubscriptionKey,
sub_tx: Sender<SafekeeperTimelineInfo>,
) -> (SubId, Vec<Sender<SubAnnounce>>, SubAnnounce) {
let sub_id = self.next_sub_id;
self.next_sub_id += 1;
let announce = match sub_key {
SubscriptionKey::All => {
assert!(!self.subs_to_all.contains_key(&sub_id));
self.subs_to_all.insert(sub_id, sub_tx.clone());
SubAnnounce::AddAll(sub_tx)
}
SubscriptionKey::Timeline(ttid) => {
match self.subs_to_timelines.entry(ttid) {
Entry::Occupied(mut o) => {
let subsenders = o.get_mut();
subsenders.push(SubSender(sub_id, sub_tx.clone()));
}
Entry::Vacant(v) => {
v.insert(vec![SubSender(sub_id, sub_tx.clone())]);
}
}
SubAnnounce::AddTimeline(ttid, SubSender(sub_id, sub_tx))
}
};
// Collect existing publishers to notify them after lock is released;
// TODO: the probability of channels being full here is tiny (publisher
// always blocks listening chan), we can try sending first and resort to
// cloning if needed.
//
// Deadlock is possible only if publisher tries to access shared state
// during its lifetime, i.e. we add maintenance of set of published
// tlis. Otherwise we can just await here (but lock must be replaced
// with Tokio one).
//
// We could also just error out if some chan is full, but that needs
// cleanup of incompleted job, and notifying publishers when unregistering
// is mandatory anyway.
(sub_id, self.pub_txs.values().cloned().collect(), announce)
}
// Unregister the subscriber. Similar to register_subscriber, returns list
// of channels through which publishers must be notified about the removal.
pub fn unregister_subscriber(
&mut self,
sub_id: SubId,
sub_key: SubscriptionKey,
) -> Option<(Vec<Sender<SubAnnounce>>, SubAnnounce)> {
// We need to notify existing publishers only about per timeline
// subscriptions, 'all' kind is detected on its own through closed
// channels.
let announce = match sub_key {
SubscriptionKey::All => {
assert!(self.subs_to_all.contains_key(&sub_id));
self.subs_to_all.remove(&sub_id);
None
}
SubscriptionKey::Timeline(ref ttid) => {
remove_sub(&mut self.subs_to_timelines, ttid, sub_id);
Some(SubAnnounce::RemoveTimeline(*ttid, sub_id))
}
};
announce.map(|a| (self.pub_txs.values().cloned().collect(), a))
}
pub fn report(&mut self) {
println!(
"registered {} publishers, {} subs to all, {} subs to timelines",
self.pub_txs.len(),
self.subs_to_all.len(),
self.subs_to_timelines.len(),
);
}
}
// SharedState wrapper for post-locking operations (sending to pub_tx chans).
#[derive(Clone)]
struct Registry {
shared_state: Arc<Mutex<SharedState>>,
}
impl Registry {
// Register new publisher in shared state.
pub fn register_publisher(&self) -> Publisher {
let (announce_tx, announce_rx) = mpsc::channel(128);
let mut ss = self.shared_state.lock().unwrap();
let id = ss.register_publisher(announce_tx);
let (subs_to_all, subs_to_timelines) = (
ss.subs_to_all.values().cloned().collect(),
ss.subs_to_timelines.clone(),
);
drop(ss);
// println!("registered publisher {}", id);
Publisher {
id,
announce_rx: announce_rx.into(),
subs_to_all,
subs_to_timelines,
registry: self.clone(),
}
}
pub fn unregister_publisher(&self, publisher: &Publisher) {
self.shared_state
.lock()
.unwrap()
.unregister_publisher(publisher.id);
// println!("unregistered publisher {}", publisher.id);
}
// Register new subscriber in shared state.
pub async fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber {
let (tx, rx) = mpsc::channel(CHAN_SIZE);
let id;
let mut pub_txs;
let announce;
{
let mut ss = self.shared_state.lock().unwrap();
(id, pub_txs, announce) = ss.register_subscriber(sub_key, tx);
}
// Note: it is important to create Subscriber before .await. If client
// disconnects during await, which would terminate the Future we still
// need to run Subscriber's drop() which will unregister it from the
// shared state.
let subscriber = Subscriber {
id,
key: sub_key,
sub_rx: rx,
registry: self.clone(),
};
// Notify existing publishers about new subscriber.
for pub_tx in pub_txs.iter_mut() {
// Closed channel is fine; it means publisher has gone.
pub_tx.send(announce.clone()).await.ok();
}
// println!("registered subscriber {}", id);
subscriber
}
// Unregister the subscriber
pub fn unregister_subscriber(&self, sub: &Subscriber) {
let mut ss = self.shared_state.lock().unwrap();
let announce_pack = ss.unregister_subscriber(sub.id, sub.key);
drop(ss);
// Notify publishers about the removal. Apart from wanting to do it
// outside lock, here we also spin a task as Drop impl can't be async.
if let Some((mut pub_txs, announce)) = announce_pack {
tokio::spawn(async move {
for pub_tx in pub_txs.iter_mut() {
// Closed channel is fine; it means publisher has gone.
pub_tx.send(announce.clone()).await.ok();
}
});
}
// println!("unregistered subscriber {}", sub.id);
}
pub async fn report(&self) {
let mut interval = time::interval(Duration::from_millis(1000));
loop {
interval.tick().await;
self.shared_state.lock().unwrap().report();
}
}
}
// Private subscriber state.
struct Subscriber {
id: SubId,
key: SubscriptionKey,
// Subscriber receives messages from publishers here.
sub_rx: Receiver<SafekeeperTimelineInfo>,
// to unregister itself from shared state in Drop
registry: Registry,
}
impl Drop for Subscriber {
fn drop(&mut self) {
self.registry.unregister_subscriber(self);
}
}
// Private publisher state
struct Publisher {
id: PubId,
// new subscribers request to send (or stop sending) msgs them here.
// It could be just Receiver, but weirdly it doesn't implement futures_core Stream directly.
announce_rx: ReceiverStream<SubAnnounce>,
subs_to_all: Vec<Sender<SafekeeperTimelineInfo>>,
subs_to_timelines: HashMap<TenantTimelineId, Vec<SubSender>>,
// to unregister itself from shared state in Drop
registry: Registry,
}
impl Publisher {
// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let mut cleanup_subs_to_all = false;
for sub in self.subs_to_all.iter() {
match sub.try_send(msg.clone()) {
Err(TrySendError::Full(_)) => {
// println!("dropping message, channel is full");
}
Err(TrySendError::Closed(_)) => {
cleanup_subs_to_all = true;
}
_ => (),
}
}
// some channels got closed (subscriber gone), remove them
if cleanup_subs_to_all {
self.subs_to_all.retain(|tx| !tx.is_closed());
}
// send message to per timeline subscribers
let ttid = parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or(Status::new(
Code::InvalidArgument,
"missing tenant_timeline_id",
))?)?;
if let Some(subs) = self.subs_to_timelines.get(&ttid) {
for tx in subs.iter().map(|sub_sender| &sub_sender.1) {
if let Err(TrySendError::Full(_)) = tx.try_send(msg.clone()) {
// println!("dropping message, channel is full");
}
// closed channel is ignored here; we will be notified and remove it soon
}
}
Ok(())
}
// Add/remove subscriber according to sub_announce.
pub fn update_sub(&mut self, sub_announce: SubAnnounce) {
match sub_announce {
SubAnnounce::AddAll(tx) => self.subs_to_all.push(tx),
SubAnnounce::AddTimeline(ttid, sub_sender) => {
match self.subs_to_timelines.entry(ttid) {
Entry::Occupied(mut o) => {
let subsenders = o.get_mut();
subsenders.push(sub_sender);
}
Entry::Vacant(v) => {
v.insert(vec![sub_sender]);
}
}
}
SubAnnounce::RemoveTimeline(ref ttid, sub_id) => {
remove_sub(&mut self.subs_to_timelines, ttid, sub_id);
}
}
}
}
impl Drop for Publisher {
fn drop(&mut self) {
self.registry.unregister_publisher(self);
}
}
struct NeonBrokerService {
registry: Registry,
}
#[tonic::async_trait]
impl NeonBroker for NeonBrokerService {
async fn publish_safekeeper_info(
&self,
request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
) -> Result<Response<Empty>, Status> {
let mut publisher = self.registry.register_publisher();
let mut stream = request.into_inner();
loop {
select! {
msg = stream.next() => {
match msg {
Some(Ok(msg)) => {publisher.send_msg(&msg)?;},
Some(Err(e)) => {return Err(e);}, // grpc error from the stream
None => {break;} // closed stream
}
}
Some(announce) = publisher.announce_rx.next() => {
publisher.update_sub(announce);
}
}
}
Ok(Response::new(Empty {}))
}
type SubscribeSafekeeperInfoStream =
Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
async fn subscribe_safekeeper_info(
&self,
request: Request<SubscribeSafekeeperInfoRequest>,
) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
let proto_key = request.into_inner().subscription_key.ok_or(Status::new(
Code::InvalidArgument,
"missing subscription key",
))?;
let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
let mut subscriber = self.registry.register_subscriber(sub_key).await;
// transform rx into stream with item = Result, as method result demands
let output = async_stream::try_stream! {
while let Some(info) = subscriber.sub_rx.recv().await {
yield info
}
// internal generator
// let _ = subscriber.sub_rx.try_recv().ok();
// let mut counter = 0;
// loop {
// let info = SafekeeperTimelineInfo {
// safekeeper_id: 1,
// tenant_timeline_id: Some(ProtoTenantTimelineId {
// tenant_id: vec![0xFF; 16],
// timeline_id: vec![0xFF; 16],
// // timeline_id: tli_from_u64(counter),
// }),
// last_log_term: 0,
// flush_lsn: counter,
// commit_lsn: 2,
// backup_lsn: 3,
// remote_consistent_lsn: 4,
// peer_horizon_lsn: 5,
// safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
// };
// counter += 1;
// yield info;
// }
};
Ok(Response::new(
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// console_subscriber::init();
let addr = "[::1]:50051".parse()?;
let registry = Registry {
shared_state: Arc::new(Mutex::new(SharedState {
pub_txs: HashMap::new(),
next_pub_id: 0,
subs_to_all: HashMap::new(),
subs_to_timelines: HashMap::new(),
next_sub_id: 0,
})),
};
let neon_broker_service = NeonBrokerService {
registry: registry.clone(),
};
tokio::spawn(async move { registry.report().await });
Server::builder()
.http2_keepalive_interval(Some(Duration::from_millis(5000)))
.add_service(NeonBrokerServer::new(neon_broker_service))
.serve(addr)
.await?;
Ok(())
}
// parse variable length bytes from protobuf
fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
let tenant_id = TenantId::from_vec(&proto_ttid.tenant_id)
.map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
let timeline_id = TimelineId::from_vec(&proto_ttid.timeline_id).map_err(|e| {
Status::new(
Code::InvalidArgument,
format!("malformed timeline_id: {}", e),
)
})?;
Ok(TenantTimelineId {
tenant_id,
timeline_id,
})
}

View File

@@ -3,6 +3,13 @@ use std::{fmt, str::FromStr};
use hex::FromHex;
use rand::Rng;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum IdError {
#[error("invalid id length {0}")]
VecParseError(usize),
}
/// Neon ID is a 128-bit random ID.
/// Used to represent various identifiers. Provides handy utility methods and impls.
@@ -22,6 +29,15 @@ impl Id {
Id::from(arr)
}
pub fn from_vec(src: &Vec<u8>) -> Result<Id, IdError> {
if src.len() != 16 {
return Err(IdError::VecParseError(src.len()));
}
let mut zid_slice = [0u8; 16];
zid_slice.copy_from_slice(&src);
Ok(zid_slice.into())
}
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
@@ -100,6 +116,10 @@ macro_rules! id_newtype {
$t(Id::get_from_buf(buf))
}
pub fn from_vec(src: &Vec<u8>) -> Result<$t, IdError> {
Ok($t(Id::from_vec(src)?))
}
pub fn as_arr(&self) -> [u8; 16] {
self.0.as_arr()
}