mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Add storage_broker binary.
Which ought to replace etcd. This patch only adds the binary and adjusts Dockerfile to include it; subsequent ones will add deploy of helm chart and the actual replacement. It is a simple and fast pub-sub message bus. In this patch only safekeeper message is supported, but others can be easily added. Compilation now requires protoc to be installed. Installing protobuf-compiler package is fine for Debian/Ubuntu. ref https://github.com/neondatabase/neon/pull/2733 https://github.com/neondatabase/neon/issues/2394
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
!pgxn/
|
||||
!proxy/
|
||||
!safekeeper/
|
||||
!storage_broker/
|
||||
!vendor/postgres-v14/
|
||||
!vendor/postgres-v15/
|
||||
!workspace_hack/
|
||||
|
||||
1
.github/ansible/get_binaries.sh
vendored
1
.github/ansible/get_binaries.sh
vendored
@@ -25,6 +25,7 @@ mkdir neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/pageserver_binutils neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/neon_broker neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
|
||||
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/
|
||||
docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/
|
||||
|
||||
4
.github/workflows/codestyle.yml
vendored
4
.github/workflows/codestyle.yml
vendored
@@ -48,11 +48,11 @@ jobs:
|
||||
if: matrix.os == 'ubuntu-latest'
|
||||
run: |
|
||||
sudo apt update
|
||||
sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev
|
||||
sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev protobuf-compiler
|
||||
|
||||
- name: Install macOS postgres dependencies
|
||||
if: matrix.os == 'macos-latest'
|
||||
run: brew install flex bison openssl
|
||||
run: brew install flex bison openssl protobuf
|
||||
|
||||
- name: Set pg 14 revision for caching
|
||||
id: pg_v14_rev
|
||||
|
||||
188
Cargo.lock
generated
188
Cargo.lock
generated
@@ -457,11 +457,26 @@ checksum = "6bf8832993da70a4c6d13c581f4463c2bdda27b9bf1c5498dc4365543abe6d6f"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"bitflags",
|
||||
"clap_derive",
|
||||
"clap_lex 0.3.0",
|
||||
"once_cell",
|
||||
"strsim",
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.0.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.2.4"
|
||||
@@ -1005,11 +1020,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fb8664f6ea68aba5503d42dd1be786b0f1bd9b7972e7f40208c83ef74db91bf"
|
||||
dependencies = [
|
||||
"http",
|
||||
"prost",
|
||||
"prost 0.10.4",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tonic 0.7.2",
|
||||
"tonic-build 0.7.2",
|
||||
"tower",
|
||||
"tower-service",
|
||||
]
|
||||
@@ -2464,6 +2479,30 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||
dependencies = [
|
||||
"proc-macro-error-attr",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.19"
|
||||
@@ -2515,7 +2554,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
"prost-derive 0.10.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a0841812012b2d4a6145fae9a6af1534873c32aa67fff26bd09f8fa42c83f95a"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive 0.11.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2533,13 +2582,35 @@ dependencies = [
|
||||
"log",
|
||||
"multimap",
|
||||
"petgraph",
|
||||
"prost",
|
||||
"prost-types",
|
||||
"prost 0.10.4",
|
||||
"prost-types 0.10.1",
|
||||
"regex",
|
||||
"tempfile",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-build"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d8b442418ea0822409d9e7d047cbf1e7e9e1760b172bf9982cf29d517c93511"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"multimap",
|
||||
"petgraph",
|
||||
"prettyplease",
|
||||
"prost 0.11.2",
|
||||
"prost-types 0.11.2",
|
||||
"regex",
|
||||
"syn",
|
||||
"tempfile",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.10.1"
|
||||
@@ -2553,6 +2624,19 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "164ae68b6587001ca506d3bf7f1000bfa248d0e1217b618108fba4ec1d0cc306"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.10.1"
|
||||
@@ -2560,7 +2644,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost",
|
||||
"prost 0.10.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-types"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost 0.11.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3422,6 +3516,32 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "storage_broker"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"bytes",
|
||||
"clap 4.0.15",
|
||||
"futures",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"prost 0.11.2",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.8.2",
|
||||
"tonic-build 0.8.2",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "str_stack"
|
||||
version = "0.1.0"
|
||||
@@ -3822,8 +3942,40 @@ dependencies = [
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"prost-derive",
|
||||
"prost 0.10.4",
|
||||
"prost-derive 0.10.1",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost 0.11.2",
|
||||
"prost-derive 0.11.2",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -3842,7 +3994,20 @@ checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1"
|
||||
dependencies = [
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"prost-build",
|
||||
"prost-build 0.10.4",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-build"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc"
|
||||
dependencies = [
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"prost-build 0.11.2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
@@ -4401,7 +4566,8 @@ dependencies = [
|
||||
"num-bigint",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
"prost",
|
||||
"prost 0.10.4",
|
||||
"prost 0.11.2",
|
||||
"rand",
|
||||
"regex",
|
||||
"regex-syntax",
|
||||
|
||||
@@ -16,6 +16,7 @@ members = [
|
||||
"pageserver",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
"workspace_hack",
|
||||
"libs/*",
|
||||
]
|
||||
|
||||
@@ -44,7 +44,7 @@ COPY . .
|
||||
# Show build caching stats to check if it was used in the end.
|
||||
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
|
||||
RUN set -e \
|
||||
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin proxy --locked --release \
|
||||
&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \
|
||||
&& cachepot -s
|
||||
|
||||
# Build final image
|
||||
@@ -67,6 +67,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
|
||||
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
|
||||
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
|
||||
|
||||
@@ -35,12 +35,12 @@ Pageserver consists of:
|
||||
* On Ubuntu or Debian, this set of packages should be sufficient to build the code:
|
||||
```bash
|
||||
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
|
||||
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client
|
||||
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client protobuf-compiler
|
||||
```
|
||||
* On Fedora, these packages are needed:
|
||||
```bash
|
||||
dnf install flex bison readline-devel zlib-devel openssl-devel \
|
||||
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib
|
||||
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib protobuf-compiler
|
||||
```
|
||||
|
||||
2. [Install Rust](https://www.rust-lang.org/tools/install)
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
|
||||
Below you will find a brief overview of each subdir in the source tree in alphabetical order.
|
||||
|
||||
`storage_broker`:
|
||||
|
||||
Neon storage broker, providing messaging between safekeepers and pageservers.
|
||||
[storage_broker.md](./storage_broker.md)
|
||||
|
||||
`/control_plane`:
|
||||
|
||||
Local control plane.
|
||||
|
||||
27
docs/storage_broker.md
Normal file
27
docs/storage_broker.md
Normal file
@@ -0,0 +1,27 @@
|
||||
# Storage broker
|
||||
|
||||
Storage broker targets two issues:
|
||||
- Allowing safekeepers and pageservers learn which nodes also hold their
|
||||
timelines, and timeline statuses there.
|
||||
- Avoiding O(n^2) connections between storage nodes while doing so.
|
||||
|
||||
This is used
|
||||
- By pageservers to determine the most advanced and alive safekeeper to pull WAL from.
|
||||
- By safekeepers to synchronize on the timeline: advance
|
||||
`remote_consistent_lsn`, `backup_lsn`, choose who offloads WAL to s3.
|
||||
|
||||
Technically, it is a simple stateless pub-sub message broker based on tonic
|
||||
(grpc) making multiplexing easy. Since it is stateless, fault tolerance can be
|
||||
provided by k8s; there is no built in replication support, though it is not hard
|
||||
to add.
|
||||
|
||||
Currently, the only message is `SafekeeperTimelineInfo`. Each safekeeper, for
|
||||
each active timeline, once in a while pushes timeline status to the broker.
|
||||
Other nodes subscribe and receive this info, using it per above.
|
||||
|
||||
Broker serves /metrics on the same port as grpc service.
|
||||
|
||||
grpcurl can be used to check which values are currently being pushed:
|
||||
```
|
||||
grpcurl -proto broker/proto/broker.proto -d '{"all":{}}' -plaintext localhost:50051 storage_broker.NeonBroker/SubscribeSafekeeperInfo
|
||||
```
|
||||
@@ -3,6 +3,13 @@ use std::{fmt, str::FromStr};
|
||||
use hex::FromHex;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum IdError {
|
||||
#[error("invalid id length {0}")]
|
||||
SliceParseError(usize),
|
||||
}
|
||||
|
||||
/// Neon ID is a 128-bit random ID.
|
||||
/// Used to represent various identifiers. Provides handy utility methods and impls.
|
||||
@@ -22,6 +29,15 @@ impl Id {
|
||||
Id::from(arr)
|
||||
}
|
||||
|
||||
pub fn from_slice(src: &[u8]) -> Result<Id, IdError> {
|
||||
if src.len() != 16 {
|
||||
return Err(IdError::SliceParseError(src.len()));
|
||||
}
|
||||
let mut id_array = [0u8; 16];
|
||||
id_array.copy_from_slice(src);
|
||||
Ok(id_array.into())
|
||||
}
|
||||
|
||||
pub fn as_arr(&self) -> [u8; 16] {
|
||||
self.0
|
||||
}
|
||||
@@ -100,6 +116,10 @@ macro_rules! id_newtype {
|
||||
$t(Id::get_from_buf(buf))
|
||||
}
|
||||
|
||||
pub fn from_slice(src: &[u8]) -> Result<$t, IdError> {
|
||||
Ok($t(Id::from_slice(src)?))
|
||||
}
|
||||
|
||||
pub fn as_arr(&self) -> [u8; 16] {
|
||||
self.0.as_arr()
|
||||
}
|
||||
|
||||
37
storage_broker/Cargo.toml
Normal file
37
storage_broker/Cargo.toml
Normal file
@@ -0,0 +1,37 @@
|
||||
[package]
|
||||
name = "storage_broker"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
bench = []
|
||||
|
||||
[dependencies]
|
||||
async-stream = "0.3"
|
||||
bytes = "1.0"
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
futures = "0.3"
|
||||
futures-core = "0.3"
|
||||
futures-util = "0.3"
|
||||
git-version = "0.3.5"
|
||||
humantime = "2.1.0"
|
||||
hyper = {version = "0.14.14", features = ["full"]}
|
||||
once_cell = "1.13.0"
|
||||
parking_lot = "0.12"
|
||||
prost = "0.11"
|
||||
tonic = "0.8"
|
||||
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
|
||||
tokio-stream = "0.1"
|
||||
tracing = "0.1.27"
|
||||
|
||||
metrics = { path = "../libs/metrics" }
|
||||
utils = { path = "../libs/utils" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.8"
|
||||
|
||||
[[bench]]
|
||||
name = "rps"
|
||||
harness = false
|
||||
|
||||
174
storage_broker/benches/rps.rs
Normal file
174
storage_broker/benches/rps.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use clap::Parser;
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use storage_broker::DEFAULT_LISTEN_ADDR;
|
||||
use tokio::time;
|
||||
|
||||
use tonic::Request;
|
||||
|
||||
const ABOUT: &str = r#"
|
||||
A simple benchmarking tool for storage_broker. Creates specified number of per
|
||||
timeline publishers and subscribers; each publisher continiously sends
|
||||
messages, subscribers read them. Each second the tool outputs number of
|
||||
messages summed across all subscribers and min number of messages
|
||||
recevied by single subscriber.
|
||||
|
||||
For example,
|
||||
cargo build -r -p storage_broker && target/release/storage_broker
|
||||
cargo bench --bench rps -- -s 1 -p 1
|
||||
"#;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about = ABOUT)]
|
||||
struct Args {
|
||||
/// Number of publishers
|
||||
#[clap(short = 'p', long, value_parser, default_value_t = 1)]
|
||||
num_pubs: u64,
|
||||
/// Number of subscribers
|
||||
#[clap(short = 's', long, value_parser, default_value_t = 1)]
|
||||
num_subs: u64,
|
||||
// Fake value to satisfy `cargo bench` passing it.
|
||||
#[clap(long)]
|
||||
bench: bool,
|
||||
}
|
||||
|
||||
async fn progress_reporter(counters: Vec<Arc<AtomicU64>>) {
|
||||
let mut interval = time::interval(Duration::from_millis(1000));
|
||||
let mut c_old = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
|
||||
let mut c_min_old = counters
|
||||
.iter()
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.min()
|
||||
.unwrap_or(0);
|
||||
let mut started_at = None;
|
||||
let mut skipped: u64 = 0;
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let c_new = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum();
|
||||
let c_min_new = counters
|
||||
.iter()
|
||||
.map(|c| c.load(Ordering::Relaxed))
|
||||
.min()
|
||||
.unwrap_or(0);
|
||||
if c_new > 0 && started_at.is_none() {
|
||||
started_at = Some(Instant::now());
|
||||
skipped = c_new;
|
||||
}
|
||||
let avg_rps = started_at.map(|s| {
|
||||
let dur = s.elapsed();
|
||||
let dur_secs = dur.as_secs() as f64 + (dur.subsec_millis() as f64) / 1000.0;
|
||||
let avg_rps = (c_new - skipped) as f64 / dur_secs;
|
||||
(dur, avg_rps)
|
||||
});
|
||||
println!(
|
||||
"sum rps {}, min rps {} total {}, total min {}, duration, avg sum rps {:?}",
|
||||
c_new - c_old,
|
||||
c_min_new - c_min_old,
|
||||
c_new,
|
||||
c_min_new,
|
||||
avg_rps
|
||||
);
|
||||
c_old = c_new;
|
||||
c_min_old = c_min_new;
|
||||
}
|
||||
}
|
||||
|
||||
fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
let mut timeline_id = vec![0xFF; 8];
|
||||
timeline_id.extend_from_slice(&i.to_be_bytes());
|
||||
timeline_id
|
||||
}
|
||||
|
||||
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
|
||||
.await
|
||||
.unwrap(),
|
||||
};
|
||||
|
||||
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(i),
|
||||
});
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
subscription_key: Some(key),
|
||||
};
|
||||
let mut stream = client
|
||||
.subscribe_safekeeper_info(request)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
while let Some(_feature) = stream.message().await.unwrap() {
|
||||
counter.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
|
||||
.await
|
||||
.unwrap(),
|
||||
};
|
||||
let mut counter: u64 = 0;
|
||||
|
||||
// create stream producing new values
|
||||
let outbound = async_stream::stream! {
|
||||
loop {
|
||||
let info = SafekeeperTimelineInfo {
|
||||
safekeeper_id: 1,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(counter % n_keys),
|
||||
}),
|
||||
last_log_term: 0,
|
||||
flush_lsn: counter,
|
||||
commit_lsn: 2,
|
||||
backup_lsn: 3,
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
};
|
||||
counter += 1;
|
||||
yield info;
|
||||
}
|
||||
};
|
||||
let response = client.publish_safekeeper_info(Request::new(outbound)).await;
|
||||
println!("pub response is {:?}", response);
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = Args::parse();
|
||||
|
||||
let mut counters = Vec::with_capacity(args.num_subs as usize);
|
||||
for _ in 0..args.num_subs {
|
||||
counters.push(Arc::new(AtomicU64::new(0)));
|
||||
}
|
||||
let h = tokio::spawn(progress_reporter(counters.clone()));
|
||||
|
||||
let c = BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 0..args.num_subs {
|
||||
let c = Some(c.clone());
|
||||
tokio::spawn(subscribe(c, counters[i as usize].clone(), i));
|
||||
}
|
||||
for _i in 0..args.num_pubs {
|
||||
let c = None;
|
||||
tokio::spawn(publish(c, args.num_subs as u64));
|
||||
}
|
||||
|
||||
h.await?;
|
||||
Ok(())
|
||||
}
|
||||
7
storage_broker/build.rs
Normal file
7
storage_broker/build.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Generate code to deterministic location to make finding it easier.
|
||||
tonic_build::configure()
|
||||
.out_dir("proto/") // put generated code to proto/
|
||||
.compile(&["proto/broker.proto"], &["proto/"])?;
|
||||
Ok(())
|
||||
}
|
||||
2
storage_broker/proto/.gitignore
vendored
Normal file
2
storage_broker/proto/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
# protobuf generated code
|
||||
storage_broker.rs
|
||||
44
storage_broker/proto/broker.proto
Normal file
44
storage_broker/proto/broker.proto
Normal file
@@ -0,0 +1,44 @@
|
||||
syntax = "proto3";
|
||||
|
||||
import "google/protobuf/empty.proto";
|
||||
|
||||
package storage_broker;
|
||||
|
||||
service BrokerService {
|
||||
// Subscribe to safekeeper updates.
|
||||
rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {};
|
||||
|
||||
// Publish safekeeper updates.
|
||||
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
|
||||
}
|
||||
|
||||
message SubscribeSafekeeperInfoRequest {
|
||||
oneof subscription_key {
|
||||
google.protobuf.Empty all = 1; // subscribe to everything
|
||||
TenantTimelineId tenant_timeline_id = 2; // subscribe to specific timeline
|
||||
}
|
||||
}
|
||||
|
||||
message SafekeeperTimelineInfo {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// Term of the last entry.
|
||||
uint64 last_log_term = 3;
|
||||
// LSN of the last record.
|
||||
uint64 flush_lsn = 4;
|
||||
// Up to which LSN safekeeper regards its WAL as committed.
|
||||
uint64 commit_lsn = 5;
|
||||
// LSN up to which safekeeper has backed WAL.
|
||||
uint64 backup_lsn = 6;
|
||||
// LSN of last checkpoint uploaded by pageserver.
|
||||
uint64 remote_consistent_lsn = 7;
|
||||
uint64 peer_horizon_lsn = 8;
|
||||
uint64 local_start_lsn = 9;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
}
|
||||
|
||||
message TenantTimelineId {
|
||||
bytes tenant_id = 1;
|
||||
bytes timeline_id = 2;
|
||||
}
|
||||
557
storage_broker/src/bin/storage_broker.rs
Normal file
557
storage_broker/src/bin/storage_broker.rs
Normal file
@@ -0,0 +1,557 @@
|
||||
//! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
|
||||
//! nodes messaging.
|
||||
//!
|
||||
//! Subscriptions to 1) single timeline 2) all timelines are possible. We could
|
||||
//! add subscription to the set of timelines to save grpc streams, but testing
|
||||
//! shows many individual streams is also ok.
|
||||
//!
|
||||
//! Message is dropped if subscriber can't consume it, not affecting other
|
||||
//! subscribers.
|
||||
//!
|
||||
//! Only safekeeper message is supported, but it is not hard to add something
|
||||
//! else with generics.
|
||||
use clap::{command, Parser};
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::server::conn::AddrStream;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, StatusCode};
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::time;
|
||||
use tonic::codegen::Service;
|
||||
use tonic::transport::server::Connected;
|
||||
use tonic::Code;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::*;
|
||||
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
|
||||
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR};
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::logging::{self, LogFormat};
|
||||
use utils::project_git_version;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_CHAN_SIZE: usize = 128;
|
||||
const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms";
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
|
||||
struct Args {
|
||||
/// Endpoint to listen on.
|
||||
#[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
|
||||
listen_addr: SocketAddr,
|
||||
/// Size of the queue to the subscriber.
|
||||
#[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
|
||||
chan_size: usize,
|
||||
/// HTTP/2 keepalive interval.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)]
|
||||
http2_keepalive_interval: Duration,
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
log_format: String,
|
||||
}
|
||||
|
||||
type PubId = u64; // id of publisher for registering in maps
|
||||
type SubId = u64; // id of subscriber for registering in maps
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum SubscriptionKey {
|
||||
All,
|
||||
Timeline(TenantTimelineId),
|
||||
}
|
||||
|
||||
impl SubscriptionKey {
|
||||
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
|
||||
match key {
|
||||
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
|
||||
ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
|
||||
Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Channel to timeline subscribers.
|
||||
struct ChanToTimelineSub {
|
||||
chan: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
// is unhandy for that as unregistering and dropping the receiver side
|
||||
// happens at different moments.
|
||||
num_subscribers: u64,
|
||||
}
|
||||
|
||||
struct SharedState {
|
||||
next_pub_id: PubId,
|
||||
num_pubs: i64,
|
||||
next_sub_id: SubId,
|
||||
num_subs_to_timelines: i64,
|
||||
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
|
||||
num_subs_to_all: i64,
|
||||
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
pub fn new(chan_size: usize) -> Self {
|
||||
SharedState {
|
||||
next_pub_id: 0,
|
||||
num_pubs: 0,
|
||||
next_sub_id: 0,
|
||||
num_subs_to_timelines: 0,
|
||||
chans_to_timeline_subs: HashMap::new(),
|
||||
num_subs_to_all: 0,
|
||||
chan_to_all_subs: broadcast::channel(chan_size).0,
|
||||
}
|
||||
}
|
||||
|
||||
// Register new publisher.
|
||||
pub fn register_publisher(&mut self) -> PubId {
|
||||
let pub_id = self.next_pub_id;
|
||||
self.next_pub_id += 1;
|
||||
self.num_pubs += 1;
|
||||
NUM_PUBS.set(self.num_pubs);
|
||||
pub_id
|
||||
}
|
||||
|
||||
// Unregister publisher.
|
||||
pub fn unregister_publisher(&mut self) {
|
||||
self.num_pubs -= 1;
|
||||
NUM_PUBS.set(self.num_pubs);
|
||||
}
|
||||
|
||||
// Register new subscriber.
|
||||
pub fn register_subscriber(
|
||||
&mut self,
|
||||
sub_key: SubscriptionKey,
|
||||
chan_size: usize,
|
||||
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
|
||||
let sub_id = self.next_sub_id;
|
||||
self.next_sub_id += 1;
|
||||
let sub_rx = match sub_key {
|
||||
SubscriptionKey::All => {
|
||||
self.num_subs_to_all += 1;
|
||||
NUM_SUBS_ALL.set(self.num_subs_to_all);
|
||||
self.chan_to_all_subs.subscribe()
|
||||
}
|
||||
SubscriptionKey::Timeline(ttid) => {
|
||||
self.num_subs_to_timelines += 1;
|
||||
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
|
||||
// Create new broadcast channel for this key, or subscriber to
|
||||
// the existing one.
|
||||
let chan_to_timeline_sub =
|
||||
self.chans_to_timeline_subs
|
||||
.entry(ttid)
|
||||
.or_insert(ChanToTimelineSub {
|
||||
chan: broadcast::channel(chan_size).0,
|
||||
num_subscribers: 0,
|
||||
});
|
||||
chan_to_timeline_sub.num_subscribers += 1;
|
||||
chan_to_timeline_sub.chan.subscribe()
|
||||
}
|
||||
};
|
||||
(sub_id, sub_rx)
|
||||
}
|
||||
|
||||
// Unregister the subscriber.
|
||||
pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
|
||||
match sub_key {
|
||||
SubscriptionKey::All => {
|
||||
self.num_subs_to_all -= 1;
|
||||
NUM_SUBS_ALL.set(self.num_subs_to_all);
|
||||
}
|
||||
SubscriptionKey::Timeline(ttid) => {
|
||||
self.num_subs_to_timelines -= 1;
|
||||
NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
|
||||
|
||||
// Remove from the map, destroying the channel, if we are the
|
||||
// last subscriber to this timeline.
|
||||
|
||||
// Missing entry is a bug; we must have registered.
|
||||
let chan_to_timeline_sub = self
|
||||
.chans_to_timeline_subs
|
||||
.get_mut(&ttid)
|
||||
.expect("failed to find sub entry in shmem during unregister");
|
||||
chan_to_timeline_sub.num_subscribers -= 1;
|
||||
if chan_to_timeline_sub.num_subscribers == 0 {
|
||||
self.chans_to_timeline_subs.remove(&ttid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SharedState wrapper.
|
||||
#[derive(Clone)]
|
||||
struct Registry {
|
||||
shared_state: Arc<RwLock<SharedState>>,
|
||||
chan_size: usize,
|
||||
}
|
||||
|
||||
impl Registry {
|
||||
// Register new publisher in shared state.
|
||||
pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
|
||||
let pub_id = self.shared_state.write().register_publisher();
|
||||
info!("publication started id={} addr={:?}", pub_id, remote_addr);
|
||||
Publisher {
|
||||
id: pub_id,
|
||||
registry: self.clone(),
|
||||
remote_addr,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unregister_publisher(&self, publisher: &Publisher) {
|
||||
self.shared_state.write().unregister_publisher();
|
||||
info!(
|
||||
"publication ended id={} addr={:?}",
|
||||
publisher.id, publisher.remote_addr
|
||||
);
|
||||
}
|
||||
|
||||
// Register new subscriber in shared state.
|
||||
pub fn register_subscriber(
|
||||
&self,
|
||||
sub_key: SubscriptionKey,
|
||||
remote_addr: SocketAddr,
|
||||
) -> Subscriber {
|
||||
let (sub_id, sub_rx) = self
|
||||
.shared_state
|
||||
.write()
|
||||
.register_subscriber(sub_key, self.chan_size);
|
||||
info!(
|
||||
"subscription started id={}, key={:?}, addr={:?}",
|
||||
sub_id, sub_key, remote_addr
|
||||
);
|
||||
Subscriber {
|
||||
id: sub_id,
|
||||
key: sub_key,
|
||||
sub_rx,
|
||||
registry: self.clone(),
|
||||
remote_addr,
|
||||
}
|
||||
}
|
||||
|
||||
// Unregister the subscriber
|
||||
pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
|
||||
self.shared_state
|
||||
.write()
|
||||
.unregister_subscriber(subscriber.key);
|
||||
info!(
|
||||
"subscription ended id={}, key={:?}, addr={:?}",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Private subscriber state.
|
||||
struct Subscriber {
|
||||
id: SubId,
|
||||
key: SubscriptionKey,
|
||||
// Subscriber receives messages from publishers here.
|
||||
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
|
||||
// to unregister itself from shared state in Drop
|
||||
registry: Registry,
|
||||
// for logging
|
||||
remote_addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl Drop for Subscriber {
|
||||
fn drop(&mut self) {
|
||||
self.registry.unregister_subscriber(self);
|
||||
}
|
||||
}
|
||||
|
||||
// Private publisher state
|
||||
struct Publisher {
|
||||
id: PubId,
|
||||
registry: Registry,
|
||||
// for logging
|
||||
remote_addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.registry.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers
|
||||
let ttid =
|
||||
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Publisher {
|
||||
fn drop(&mut self) {
|
||||
self.registry.unregister_publisher(self);
|
||||
}
|
||||
}
|
||||
|
||||
struct Broker {
|
||||
registry: Registry,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl BrokerService for Broker {
|
||||
async fn publish_safekeeper_info(
|
||||
&self,
|
||||
request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
|
||||
) -> Result<Response<()>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let mut publisher = self.registry.register_publisher(remote_addr);
|
||||
|
||||
let mut stream = request.into_inner();
|
||||
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(Ok(msg)) => publisher.send_msg(&msg)?,
|
||||
Some(Err(e)) => return Err(e), // grpc error from the stream
|
||||
None => break, // closed stream
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Response::new(()))
|
||||
}
|
||||
|
||||
type SubscribeSafekeeperInfoStream =
|
||||
Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
|
||||
|
||||
async fn subscribe_safekeeper_info(
|
||||
&self,
|
||||
request: Request<SubscribeSafekeeperInfoRequest>,
|
||||
) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let proto_key = request
|
||||
.into_inner()
|
||||
.subscription_key
|
||||
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
|
||||
let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
|
||||
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
|
||||
|
||||
// transform rx into stream with item = Result, as method result demands
|
||||
let output = async_stream::try_stream! {
|
||||
let mut warn_interval = time::interval(Duration::from_millis(1000));
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(info) => yield info,
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
missed_msgs += skipped_msg;
|
||||
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
|
||||
warn!("dropped {} messages, channel is full", missed_msgs);
|
||||
missed_msgs = 0;
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
// can't happen, we never drop the channel while there is a subscriber
|
||||
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Response::new(
|
||||
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
// We serve only metrics and healthcheck through http1.
|
||||
async fn http1_handler(
|
||||
req: hyper::Request<hyper::body::Body>,
|
||||
) -> Result<hyper::Response<Body>, Infallible> {
|
||||
let resp = match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
let mut buffer = vec![];
|
||||
let metrics = metrics::gather();
|
||||
let encoder = TextEncoder::new();
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
hyper::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
.unwrap()
|
||||
}
|
||||
(&Method::GET, "/status") => hyper::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
_ => hyper::Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
};
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = Args::parse();
|
||||
|
||||
logging::init(LogFormat::from_config(&args.log_format)?)?;
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
let registry = Registry {
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))),
|
||||
chan_size: args.chan_size,
|
||||
};
|
||||
let storage_broker_impl = Broker {
|
||||
registry: registry.clone(),
|
||||
};
|
||||
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
|
||||
|
||||
info!("listening on {}", &args.listen_addr);
|
||||
|
||||
// grpc is served along with http1 for metrics on a single port, hence we
|
||||
// don't use tonic's Server.
|
||||
hyper::Server::bind(&args.listen_addr)
|
||||
.http2_keep_alive_interval(Some(args.http2_keepalive_interval))
|
||||
.serve(make_service_fn(move |conn: &AddrStream| {
|
||||
let storage_broker_server_cloned = storage_broker_server.clone();
|
||||
let connect_info = conn.connect_info();
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |mut req| {
|
||||
// That's what tonic's MakeSvc.call does to pass conninfo to
|
||||
// the request handler (and where its request.remote_addr()
|
||||
// expects it to find).
|
||||
req.extensions_mut().insert(connect_info.clone());
|
||||
|
||||
// Technically this second clone is not needed, but consume
|
||||
// by async block is apparently unavoidable. BTW, error
|
||||
// message is enigmatic, see
|
||||
// https://github.com/rust-lang/rust/issues/68119
|
||||
//
|
||||
// We could get away without async block at all, but then we
|
||||
// need to resort to futures::Either to merge the result,
|
||||
// which doesn't caress an eye as well.
|
||||
let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
|
||||
async move {
|
||||
if req.headers().get("content-type").map(|x| x.as_bytes())
|
||||
== Some(b"application/grpc")
|
||||
{
|
||||
let res_resp = storage_broker_server_svc.call(req).await;
|
||||
// Grpc and http1 handlers have slightly different
|
||||
// Response types: it is UnsyncBoxBody for the
|
||||
// former one (not sure why) and plain hyper::Body
|
||||
// for the latter. Both implement HttpBody though,
|
||||
// and EitherBody is used to merge them.
|
||||
res_resp.map(|resp| resp.map(EitherBody::Left))
|
||||
} else {
|
||||
let res_resp = http1_handler(req).await;
|
||||
res_resp.map(|resp| resp.map(EitherBody::Right))
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
|
||||
SafekeeperTimelineInfo {
|
||||
safekeeper_id: 1,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0x00; 16],
|
||||
timeline_id,
|
||||
}),
|
||||
last_log_term: 0,
|
||||
flush_lsn: 1,
|
||||
commit_lsn: 2,
|
||||
backup_lsn: 3,
|
||||
remote_consistent_lsn: 4,
|
||||
peer_horizon_lsn: 5,
|
||||
safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
let mut timeline_id = vec![0xFF; 8];
|
||||
timeline_id.extend_from_slice(&i.to_be_bytes());
|
||||
timeline_id
|
||||
}
|
||||
|
||||
fn mock_addr() -> SocketAddr {
|
||||
"127.0.0.1:8080".parse().unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_registry() {
|
||||
let registry = Registry {
|
||||
shared_state: Arc::new(RwLock::new(SharedState::new(16))),
|
||||
chan_size: 16,
|
||||
};
|
||||
|
||||
// subscribe to timeline 2
|
||||
let ttid_2 = TenantTimelineId {
|
||||
tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
|
||||
timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
|
||||
};
|
||||
let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
|
||||
let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
|
||||
let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
|
||||
|
||||
// send two messages with different keys
|
||||
let msg_1 = msg(tli_from_u64(1));
|
||||
let msg_2 = msg(tli_from_u64(2));
|
||||
let mut publisher = registry.register_publisher(mock_addr());
|
||||
publisher.send_msg(&msg_1).expect("failed to send msg");
|
||||
publisher.send_msg(&msg_2).expect("failed to send msg");
|
||||
|
||||
// msg with key 2 should arrive to subscriber_2
|
||||
assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
|
||||
|
||||
// but nothing more
|
||||
assert_eq!(
|
||||
subscriber_2.sub_rx.try_recv().unwrap_err(),
|
||||
TryRecvError::Empty
|
||||
);
|
||||
|
||||
// subscriber_all should receive both messages
|
||||
assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
|
||||
assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
|
||||
assert_eq!(
|
||||
subscriber_all.sub_rx.try_recv().unwrap_err(),
|
||||
TryRecvError::Empty
|
||||
);
|
||||
}
|
||||
}
|
||||
108
storage_broker/src/lib.rs
Normal file
108
storage_broker/src/lib.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
use hyper::body::HttpBody;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::{transport::Channel, Code, Status};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use proto::{
|
||||
broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId,
|
||||
};
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
include!("../proto/storage_broker.rs");
|
||||
}
|
||||
|
||||
pub mod metrics;
|
||||
|
||||
// Re-exports to avoid direct tonic dependency in user crates.
|
||||
pub use tonic::Request;
|
||||
pub use tonic::Streaming;
|
||||
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
|
||||
// NeonBrokerClient charged with tonic provided Channel transport; helps to
|
||||
// avoid depending on tonic directly in user crates.
|
||||
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
|
||||
impl BrokerClientChannel {
|
||||
/// Create a new client to the given endpoint, but don't actually connect until the first request.
|
||||
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||
where
|
||||
D: std::convert::TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<StdError>,
|
||||
{
|
||||
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
|
||||
Ok(Self::new(conn))
|
||||
}
|
||||
}
|
||||
|
||||
// parse variable length bytes from protobuf
|
||||
pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
|
||||
let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
|
||||
.map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
|
||||
let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
format!("malformed timeline_id: {}", e),
|
||||
)
|
||||
})?;
|
||||
Ok(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
|
||||
// These several usages don't justify anyhow dependency, though it would work as
|
||||
// well.
|
||||
type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
// Provides impl HttpBody for two different types implementing it. Inspired by
|
||||
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
|
||||
pub enum EitherBody<A, B> {
|
||||
Left(A),
|
||||
Right(B),
|
||||
}
|
||||
|
||||
impl<A, B> HttpBody for EitherBody<A, B>
|
||||
where
|
||||
A: HttpBody + Send + Unpin,
|
||||
B: HttpBody<Data = A::Data> + Send + Unpin,
|
||||
A::Error: Into<AnyError>,
|
||||
B::Error: Into<AnyError>,
|
||||
{
|
||||
type Data = A::Data;
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
fn is_end_stream(&self) -> bool {
|
||||
match self {
|
||||
EitherBody::Left(b) => b.is_end_stream(),
|
||||
EitherBody::Right(b) => b.is_end_stream(),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_data(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
||||
match self.get_mut() {
|
||||
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
|
||||
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_trailers(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
|
||||
match self.get_mut() {
|
||||
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
|
||||
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
|
||||
err.map(|e| e.map_err(Into::into))
|
||||
}
|
||||
25
storage_broker/src/metrics.rs
Normal file
25
storage_broker/src/metrics.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
//! Broker metrics.
|
||||
|
||||
use metrics::{register_int_gauge, IntGauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!("storage_broker_active_publishers", "Number of publications")
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static NUM_SUBS_TIMELINE: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"storage_broker_per_timeline_active_subscribers",
|
||||
"Number of subsciptions to particular tenant timeline id"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"storage_broker_all_keys_active_subscribers",
|
||||
"Number of subsciptions to all keys"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
@@ -17,13 +17,13 @@ ahash = { version = "0.7", features = ["std"] }
|
||||
anyhow = { version = "1", features = ["backtrace", "std"] }
|
||||
bytes = { version = "1", features = ["serde", "std"] }
|
||||
chrono = { version = "0.4", features = ["clock", "iana-time-zone", "js-sys", "oldtime", "serde", "std", "time", "wasm-bindgen", "wasmbind", "winapi"] }
|
||||
clap = { version = "4", features = ["color", "error-context", "help", "std", "string", "suggestions", "usage"] }
|
||||
clap = { version = "4", features = ["color", "derive", "error-context", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
|
||||
either = { version = "1", features = ["use_std"] }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
|
||||
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
|
||||
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
|
||||
futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
|
||||
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
@@ -33,7 +33,8 @@ nom = { version = "7", features = ["alloc", "std"] }
|
||||
num-bigint = { version = "0.4", features = ["std"] }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] }
|
||||
prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] }
|
||||
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
|
||||
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
@@ -58,7 +59,8 @@ libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std"] }
|
||||
nom = { version = "7", features = ["alloc", "std"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] }
|
||||
prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] }
|
||||
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
|
||||
|
||||
Reference in New Issue
Block a user