From 2d42f843892d26174164927ec6ebe1f52aa7b7d2 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 16 Nov 2022 08:04:51 +0400 Subject: [PATCH] Add storage_broker binary. Which ought to replace etcd. This patch only adds the binary and adjusts Dockerfile to include it; subsequent ones will add deploy of helm chart and the actual replacement. It is a simple and fast pub-sub message bus. In this patch only safekeeper message is supported, but others can be easily added. Compilation now requires protoc to be installed. Installing protobuf-compiler package is fine for Debian/Ubuntu. ref https://github.com/neondatabase/neon/pull/2733 https://github.com/neondatabase/neon/issues/2394 --- .dockerignore | 1 + .github/ansible/get_binaries.sh | 1 + .github/workflows/codestyle.yml | 4 +- Cargo.lock | 188 +++++++- Cargo.toml | 1 + Dockerfile | 3 +- README.md | 4 +- docs/sourcetree.md | 5 + docs/storage_broker.md | 27 ++ libs/utils/src/id.rs | 20 + storage_broker/Cargo.toml | 37 ++ storage_broker/benches/rps.rs | 174 +++++++ storage_broker/build.rs | 7 + storage_broker/proto/.gitignore | 2 + storage_broker/proto/broker.proto | 44 ++ storage_broker/src/bin/storage_broker.rs | 557 +++++++++++++++++++++++ storage_broker/src/lib.rs | 108 +++++ storage_broker/src/metrics.rs | 25 + workspace_hack/Cargo.toml | 10 +- 19 files changed, 1198 insertions(+), 20 deletions(-) create mode 100644 docs/storage_broker.md create mode 100644 storage_broker/Cargo.toml create mode 100644 storage_broker/benches/rps.rs create mode 100644 storage_broker/build.rs create mode 100644 storage_broker/proto/.gitignore create mode 100644 storage_broker/proto/broker.proto create mode 100644 storage_broker/src/bin/storage_broker.rs create mode 100644 storage_broker/src/lib.rs create mode 100644 storage_broker/src/metrics.rs diff --git a/.dockerignore b/.dockerignore index 92eb4f24de..2bbff86100 100644 --- a/.dockerignore +++ b/.dockerignore @@ -14,6 +14,7 @@ !pgxn/ !proxy/ !safekeeper/ +!storage_broker/ !vendor/postgres-v14/ !vendor/postgres-v15/ !workspace_hack/ diff --git a/.github/ansible/get_binaries.sh b/.github/ansible/get_binaries.sh index 9d2d0926f5..632fa03c29 100755 --- a/.github/ansible/get_binaries.sh +++ b/.github/ansible/get_binaries.sh @@ -25,6 +25,7 @@ mkdir neon_install/bin/ docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/ docker cp ${ID}:/usr/local/bin/pageserver_binutils neon_install/bin/ docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/ +docker cp ${ID}:/usr/local/bin/neon_broker neon_install/bin/ docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/ docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/ docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/ diff --git a/.github/workflows/codestyle.yml b/.github/workflows/codestyle.yml index bb000efbac..01fef71c9a 100644 --- a/.github/workflows/codestyle.yml +++ b/.github/workflows/codestyle.yml @@ -48,11 +48,11 @@ jobs: if: matrix.os == 'ubuntu-latest' run: | sudo apt update - sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev + sudo apt install build-essential libreadline-dev zlib1g-dev flex bison libseccomp-dev libssl-dev protobuf-compiler - name: Install macOS postgres dependencies if: matrix.os == 'macos-latest' - run: brew install flex bison openssl + run: brew install flex bison openssl protobuf - name: Set pg 14 revision for caching id: pg_v14_rev diff --git a/Cargo.lock b/Cargo.lock index c112c05188..83ed5f9263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,11 +457,26 @@ checksum = "6bf8832993da70a4c6d13c581f4463c2bdda27b9bf1c5498dc4365543abe6d6f" dependencies = [ "atty", "bitflags", + "clap_derive", "clap_lex 0.3.0", + "once_cell", "strsim", "termcolor", ] +[[package]] +name = "clap_derive" +version = "4.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -1005,11 +1020,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fb8664f6ea68aba5503d42dd1be786b0f1bd9b7972e7f40208c83ef74db91bf" dependencies = [ "http", - "prost", + "prost 0.10.4", "tokio", "tokio-stream", - "tonic", - "tonic-build", + "tonic 0.7.2", + "tonic-build 0.7.2", "tower", "tower-service", ] @@ -2464,6 +2479,30 @@ dependencies = [ "syn", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -2515,7 +2554,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.10.1", +] + +[[package]] +name = "prost" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0841812012b2d4a6145fae9a6af1534873c32aa67fff26bd09f8fa42c83f95a" +dependencies = [ + "bytes", + "prost-derive 0.11.2", ] [[package]] @@ -2533,13 +2582,35 @@ dependencies = [ "log", "multimap", "petgraph", - "prost", - "prost-types", + "prost 0.10.4", + "prost-types 0.10.1", "regex", "tempfile", "which", ] +[[package]] +name = "prost-build" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d8b442418ea0822409d9e7d047cbf1e7e9e1760b172bf9982cf29d517c93511" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost 0.11.2", + "prost-types 0.11.2", + "regex", + "syn", + "tempfile", + "which", +] + [[package]] name = "prost-derive" version = "0.10.1" @@ -2553,6 +2624,19 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "164ae68b6587001ca506d3bf7f1000bfa248d0e1217b618108fba4ec1d0cc306" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-types" version = "0.10.1" @@ -2560,7 +2644,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" dependencies = [ "bytes", - "prost", + "prost 0.10.4", +] + +[[package]] +name = "prost-types" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "747761bc3dc48f9a34553bf65605cf6cb6288ba219f3450b4275dbd81539551a" +dependencies = [ + "bytes", + "prost 0.11.2", ] [[package]] @@ -3422,6 +3516,32 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "storage_broker" +version = "0.1.0" +dependencies = [ + "async-stream", + "bytes", + "clap 4.0.15", + "futures", + "futures-core", + "futures-util", + "git-version", + "humantime", + "hyper", + "metrics", + "once_cell", + "parking_lot 0.12.1", + "prost 0.11.2", + "tokio", + "tokio-stream", + "tonic 0.8.2", + "tonic-build 0.8.2", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "str_stack" version = "0.1.0" @@ -3822,8 +3942,40 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.10.4", + "prost-derive 0.10.1", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.2", + "prost-derive 0.11.2", "tokio", "tokio-stream", "tokio-util", @@ -3842,7 +3994,20 @@ checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1" dependencies = [ "prettyplease", "proc-macro2", - "prost-build", + "prost-build 0.10.4", + "quote", + "syn", +] + +[[package]] +name = "tonic-build" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build 0.11.2", "quote", "syn", ] @@ -4401,7 +4566,8 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", - "prost", + "prost 0.10.4", + "prost 0.11.2", "rand", "regex", "regex-syntax", diff --git a/Cargo.toml b/Cargo.toml index 0d73710bbb..2f73215d3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "pageserver", "proxy", "safekeeper", + "storage_broker", "workspace_hack", "libs/*", ] diff --git a/Dockerfile b/Dockerfile index b0d934d480..f0244fa8d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,7 +44,7 @@ COPY . . # Show build caching stats to check if it was used in the end. # Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats. RUN set -e \ -&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin proxy --locked --release \ +&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \ && cachepot -s # Build final image @@ -67,6 +67,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin +COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/ diff --git a/README.md b/README.md index 770c24d11f..cda36008d8 100644 --- a/README.md +++ b/README.md @@ -35,12 +35,12 @@ Pageserver consists of: * On Ubuntu or Debian, this set of packages should be sufficient to build the code: ```bash apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \ -libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client +libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client protobuf-compiler ``` * On Fedora, these packages are needed: ```bash dnf install flex bison readline-devel zlib-devel openssl-devel \ - libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib + libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib protobuf-compiler ``` 2. [Install Rust](https://www.rust-lang.org/tools/install) diff --git a/docs/sourcetree.md b/docs/sourcetree.md index 4ea83dd068..309f5a6966 100644 --- a/docs/sourcetree.md +++ b/docs/sourcetree.md @@ -2,6 +2,11 @@ Below you will find a brief overview of each subdir in the source tree in alphabetical order. +`storage_broker`: + +Neon storage broker, providing messaging between safekeepers and pageservers. +[storage_broker.md](./storage_broker.md) + `/control_plane`: Local control plane. diff --git a/docs/storage_broker.md b/docs/storage_broker.md new file mode 100644 index 0000000000..16329ceaa5 --- /dev/null +++ b/docs/storage_broker.md @@ -0,0 +1,27 @@ +# Storage broker + +Storage broker targets two issues: +- Allowing safekeepers and pageservers learn which nodes also hold their + timelines, and timeline statuses there. +- Avoiding O(n^2) connections between storage nodes while doing so. + +This is used +- By pageservers to determine the most advanced and alive safekeeper to pull WAL from. +- By safekeepers to synchronize on the timeline: advance + `remote_consistent_lsn`, `backup_lsn`, choose who offloads WAL to s3. + +Technically, it is a simple stateless pub-sub message broker based on tonic +(grpc) making multiplexing easy. Since it is stateless, fault tolerance can be +provided by k8s; there is no built in replication support, though it is not hard +to add. + +Currently, the only message is `SafekeeperTimelineInfo`. Each safekeeper, for +each active timeline, once in a while pushes timeline status to the broker. +Other nodes subscribe and receive this info, using it per above. + +Broker serves /metrics on the same port as grpc service. + +grpcurl can be used to check which values are currently being pushed: +``` +grpcurl -proto broker/proto/broker.proto -d '{"all":{}}' -plaintext localhost:50051 storage_broker.NeonBroker/SubscribeSafekeeperInfo +``` diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 7ce324614d..f84bcb793f 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -3,6 +3,13 @@ use std::{fmt, str::FromStr}; use hex::FromHex; use rand::Rng; use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum IdError { + #[error("invalid id length {0}")] + SliceParseError(usize), +} /// Neon ID is a 128-bit random ID. /// Used to represent various identifiers. Provides handy utility methods and impls. @@ -22,6 +29,15 @@ impl Id { Id::from(arr) } + pub fn from_slice(src: &[u8]) -> Result { + if src.len() != 16 { + return Err(IdError::SliceParseError(src.len())); + } + let mut id_array = [0u8; 16]; + id_array.copy_from_slice(src); + Ok(id_array.into()) + } + pub fn as_arr(&self) -> [u8; 16] { self.0 } @@ -100,6 +116,10 @@ macro_rules! id_newtype { $t(Id::get_from_buf(buf)) } + pub fn from_slice(src: &[u8]) -> Result<$t, IdError> { + Ok($t(Id::from_slice(src)?)) + } + pub fn as_arr(&self) -> [u8; 16] { self.0.as_arr() } diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml new file mode 100644 index 0000000000..843fc53f36 --- /dev/null +++ b/storage_broker/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "storage_broker" +version = "0.1.0" +edition = "2021" + +[features] +bench = [] + +[dependencies] +async-stream = "0.3" +bytes = "1.0" +clap = { version = "4.0", features = ["derive"] } +futures = "0.3" +futures-core = "0.3" +futures-util = "0.3" +git-version = "0.3.5" +humantime = "2.1.0" +hyper = {version = "0.14.14", features = ["full"]} +once_cell = "1.13.0" +parking_lot = "0.12" +prost = "0.11" +tonic = "0.8" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1" +tracing = "0.1.27" + +metrics = { path = "../libs/metrics" } +utils = { path = "../libs/utils" } +workspace_hack = { version = "0.1", path = "../workspace_hack" } + +[build-dependencies] +tonic-build = "0.8" + +[[bench]] +name = "rps" +harness = false + diff --git a/storage_broker/benches/rps.rs b/storage_broker/benches/rps.rs new file mode 100644 index 0000000000..0a72adc948 --- /dev/null +++ b/storage_broker/benches/rps.rs @@ -0,0 +1,174 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use clap::Parser; +use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey; +use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; +use storage_broker::BrokerClientChannel; +use storage_broker::DEFAULT_LISTEN_ADDR; +use tokio::time; + +use tonic::Request; + +const ABOUT: &str = r#" +A simple benchmarking tool for storage_broker. Creates specified number of per +timeline publishers and subscribers; each publisher continiously sends +messages, subscribers read them. Each second the tool outputs number of +messages summed across all subscribers and min number of messages +recevied by single subscriber. + +For example, +cargo build -r -p storage_broker && target/release/storage_broker +cargo bench --bench rps -- -s 1 -p 1 +"#; + +#[derive(Parser, Debug)] +#[clap(author, version, about = ABOUT)] +struct Args { + /// Number of publishers + #[clap(short = 'p', long, value_parser, default_value_t = 1)] + num_pubs: u64, + /// Number of subscribers + #[clap(short = 's', long, value_parser, default_value_t = 1)] + num_subs: u64, + // Fake value to satisfy `cargo bench` passing it. + #[clap(long)] + bench: bool, +} + +async fn progress_reporter(counters: Vec>) { + let mut interval = time::interval(Duration::from_millis(1000)); + let mut c_old = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum(); + let mut c_min_old = counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .min() + .unwrap_or(0); + let mut started_at = None; + let mut skipped: u64 = 0; + loop { + interval.tick().await; + let c_new = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum(); + let c_min_new = counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .min() + .unwrap_or(0); + if c_new > 0 && started_at.is_none() { + started_at = Some(Instant::now()); + skipped = c_new; + } + let avg_rps = started_at.map(|s| { + let dur = s.elapsed(); + let dur_secs = dur.as_secs() as f64 + (dur.subsec_millis() as f64) / 1000.0; + let avg_rps = (c_new - skipped) as f64 / dur_secs; + (dur, avg_rps) + }); + println!( + "sum rps {}, min rps {} total {}, total min {}, duration, avg sum rps {:?}", + c_new - c_old, + c_min_new - c_min_old, + c_new, + c_min_new, + avg_rps + ); + c_old = c_new; + c_min_old = c_min_new; + } +} + +fn tli_from_u64(i: u64) -> Vec { + let mut timeline_id = vec![0xFF; 8]; + timeline_id.extend_from_slice(&i.to_be_bytes()); + timeline_id +} + +async fn subscribe(client: Option, counter: Arc, i: u64) { + let mut client = match client { + Some(c) => c, + None => BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR)) + .await + .unwrap(), + }; + + let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId { + tenant_id: vec![0xFF; 16], + timeline_id: tli_from_u64(i), + }); + let request = SubscribeSafekeeperInfoRequest { + subscription_key: Some(key), + }; + let mut stream = client + .subscribe_safekeeper_info(request) + .await + .unwrap() + .into_inner(); + + while let Some(_feature) = stream.message().await.unwrap() { + counter.fetch_add(1, Ordering::Relaxed); + } +} + +async fn publish(client: Option, n_keys: u64) { + let mut client = match client { + Some(c) => c, + None => BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR)) + .await + .unwrap(), + }; + let mut counter: u64 = 0; + + // create stream producing new values + let outbound = async_stream::stream! { + loop { + let info = SafekeeperTimelineInfo { + safekeeper_id: 1, + tenant_timeline_id: Some(ProtoTenantTimelineId { + tenant_id: vec![0xFF; 16], + timeline_id: tli_from_u64(counter % n_keys), + }), + last_log_term: 0, + flush_lsn: counter, + commit_lsn: 2, + backup_lsn: 3, + remote_consistent_lsn: 4, + peer_horizon_lsn: 5, + safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(), + local_start_lsn: 0, + }; + counter += 1; + yield info; + } + }; + let response = client.publish_safekeeper_info(Request::new(outbound)).await; + println!("pub response is {:?}", response); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + let mut counters = Vec::with_capacity(args.num_subs as usize); + for _ in 0..args.num_subs { + counters.push(Arc::new(AtomicU64::new(0))); + } + let h = tokio::spawn(progress_reporter(counters.clone())); + + let c = BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR)) + .await + .unwrap(); + + for i in 0..args.num_subs { + let c = Some(c.clone()); + tokio::spawn(subscribe(c, counters[i as usize].clone(), i)); + } + for _i in 0..args.num_pubs { + let c = None; + tokio::spawn(publish(c, args.num_subs as u64)); + } + + h.await?; + Ok(()) +} diff --git a/storage_broker/build.rs b/storage_broker/build.rs new file mode 100644 index 0000000000..244c7217de --- /dev/null +++ b/storage_broker/build.rs @@ -0,0 +1,7 @@ +fn main() -> Result<(), Box> { + // Generate code to deterministic location to make finding it easier. + tonic_build::configure() + .out_dir("proto/") // put generated code to proto/ + .compile(&["proto/broker.proto"], &["proto/"])?; + Ok(()) +} diff --git a/storage_broker/proto/.gitignore b/storage_broker/proto/.gitignore new file mode 100644 index 0000000000..c75b90ab1c --- /dev/null +++ b/storage_broker/proto/.gitignore @@ -0,0 +1,2 @@ +# protobuf generated code +storage_broker.rs diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto new file mode 100644 index 0000000000..1a46896d02 --- /dev/null +++ b/storage_broker/proto/broker.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package storage_broker; + +service BrokerService { + // Subscribe to safekeeper updates. + rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {}; + + // Publish safekeeper updates. + rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {}; +} + +message SubscribeSafekeeperInfoRequest { + oneof subscription_key { + google.protobuf.Empty all = 1; // subscribe to everything + TenantTimelineId tenant_timeline_id = 2; // subscribe to specific timeline + } +} + +message SafekeeperTimelineInfo { + uint64 safekeeper_id = 1; + TenantTimelineId tenant_timeline_id = 2; + // Term of the last entry. + uint64 last_log_term = 3; + // LSN of the last record. + uint64 flush_lsn = 4; + // Up to which LSN safekeeper regards its WAL as committed. + uint64 commit_lsn = 5; + // LSN up to which safekeeper has backed WAL. + uint64 backup_lsn = 6; + // LSN of last checkpoint uploaded by pageserver. + uint64 remote_consistent_lsn = 7; + uint64 peer_horizon_lsn = 8; + uint64 local_start_lsn = 9; + // A connection string to use for WAL receiving. + string safekeeper_connstr = 10; +} + +message TenantTimelineId { + bytes tenant_id = 1; + bytes timeline_id = 2; +} \ No newline at end of file diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs new file mode 100644 index 0000000000..04f93a1ebb --- /dev/null +++ b/storage_broker/src/bin/storage_broker.rs @@ -0,0 +1,557 @@ +//! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage +//! nodes messaging. +//! +//! Subscriptions to 1) single timeline 2) all timelines are possible. We could +//! add subscription to the set of timelines to save grpc streams, but testing +//! shows many individual streams is also ok. +//! +//! Message is dropped if subscriber can't consume it, not affecting other +//! subscribers. +//! +//! Only safekeeper message is supported, but it is not hard to add something +//! else with generics. +use clap::{command, Parser}; +use futures_core::Stream; +use futures_util::StreamExt; +use hyper::header::CONTENT_TYPE; +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, StatusCode}; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::convert::Infallible; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use std::time::Duration; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; +use tokio::time; +use tonic::codegen::Service; +use tonic::transport::server::Connected; +use tonic::Code; +use tonic::{Request, Response, Status}; +use tracing::*; + +use metrics::{Encoder, TextEncoder}; +use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE}; +use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer}; +use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; +use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; +use storage_broker::{parse_proto_ttid, EitherBody, DEFAULT_LISTEN_ADDR}; +use utils::id::TenantTimelineId; +use utils::logging::{self, LogFormat}; +use utils::project_git_version; + +project_git_version!(GIT_VERSION); + +const DEFAULT_CHAN_SIZE: usize = 128; +const DEFAULT_HTTP2_KEEPALIVE_INTERVAL: &str = "5000ms"; + +#[derive(Parser, Debug)] +#[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)] +struct Args { + /// Endpoint to listen on. + #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)] + listen_addr: SocketAddr, + /// Size of the queue to the subscriber. + #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)] + chan_size: usize, + /// HTTP/2 keepalive interval. + #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HTTP2_KEEPALIVE_INTERVAL)] + http2_keepalive_interval: Duration, + /// Format for logging, either 'plain' or 'json'. + #[arg(long, default_value = "plain")] + log_format: String, +} + +type PubId = u64; // id of publisher for registering in maps +type SubId = u64; // id of subscriber for registering in maps + +#[derive(Copy, Clone, Debug)] +enum SubscriptionKey { + All, + Timeline(TenantTimelineId), +} + +impl SubscriptionKey { + // Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors). + pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result { + match key { + ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All), + ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => { + Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?)) + } + } + } +} + +// Channel to timeline subscribers. +struct ChanToTimelineSub { + chan: broadcast::Sender, + // Tracked separately to know when delete the shmem entry. receiver_count() + // is unhandy for that as unregistering and dropping the receiver side + // happens at different moments. + num_subscribers: u64, +} + +struct SharedState { + next_pub_id: PubId, + num_pubs: i64, + next_sub_id: SubId, + num_subs_to_timelines: i64, + chans_to_timeline_subs: HashMap, + num_subs_to_all: i64, + chan_to_all_subs: broadcast::Sender, +} + +impl SharedState { + pub fn new(chan_size: usize) -> Self { + SharedState { + next_pub_id: 0, + num_pubs: 0, + next_sub_id: 0, + num_subs_to_timelines: 0, + chans_to_timeline_subs: HashMap::new(), + num_subs_to_all: 0, + chan_to_all_subs: broadcast::channel(chan_size).0, + } + } + + // Register new publisher. + pub fn register_publisher(&mut self) -> PubId { + let pub_id = self.next_pub_id; + self.next_pub_id += 1; + self.num_pubs += 1; + NUM_PUBS.set(self.num_pubs); + pub_id + } + + // Unregister publisher. + pub fn unregister_publisher(&mut self) { + self.num_pubs -= 1; + NUM_PUBS.set(self.num_pubs); + } + + // Register new subscriber. + pub fn register_subscriber( + &mut self, + sub_key: SubscriptionKey, + chan_size: usize, + ) -> (SubId, broadcast::Receiver) { + let sub_id = self.next_sub_id; + self.next_sub_id += 1; + let sub_rx = match sub_key { + SubscriptionKey::All => { + self.num_subs_to_all += 1; + NUM_SUBS_ALL.set(self.num_subs_to_all); + self.chan_to_all_subs.subscribe() + } + SubscriptionKey::Timeline(ttid) => { + self.num_subs_to_timelines += 1; + NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines); + // Create new broadcast channel for this key, or subscriber to + // the existing one. + let chan_to_timeline_sub = + self.chans_to_timeline_subs + .entry(ttid) + .or_insert(ChanToTimelineSub { + chan: broadcast::channel(chan_size).0, + num_subscribers: 0, + }); + chan_to_timeline_sub.num_subscribers += 1; + chan_to_timeline_sub.chan.subscribe() + } + }; + (sub_id, sub_rx) + } + + // Unregister the subscriber. + pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) { + match sub_key { + SubscriptionKey::All => { + self.num_subs_to_all -= 1; + NUM_SUBS_ALL.set(self.num_subs_to_all); + } + SubscriptionKey::Timeline(ttid) => { + self.num_subs_to_timelines -= 1; + NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines); + + // Remove from the map, destroying the channel, if we are the + // last subscriber to this timeline. + + // Missing entry is a bug; we must have registered. + let chan_to_timeline_sub = self + .chans_to_timeline_subs + .get_mut(&ttid) + .expect("failed to find sub entry in shmem during unregister"); + chan_to_timeline_sub.num_subscribers -= 1; + if chan_to_timeline_sub.num_subscribers == 0 { + self.chans_to_timeline_subs.remove(&ttid); + } + } + } + } +} + +// SharedState wrapper. +#[derive(Clone)] +struct Registry { + shared_state: Arc>, + chan_size: usize, +} + +impl Registry { + // Register new publisher in shared state. + pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher { + let pub_id = self.shared_state.write().register_publisher(); + info!("publication started id={} addr={:?}", pub_id, remote_addr); + Publisher { + id: pub_id, + registry: self.clone(), + remote_addr, + } + } + + pub fn unregister_publisher(&self, publisher: &Publisher) { + self.shared_state.write().unregister_publisher(); + info!( + "publication ended id={} addr={:?}", + publisher.id, publisher.remote_addr + ); + } + + // Register new subscriber in shared state. + pub fn register_subscriber( + &self, + sub_key: SubscriptionKey, + remote_addr: SocketAddr, + ) -> Subscriber { + let (sub_id, sub_rx) = self + .shared_state + .write() + .register_subscriber(sub_key, self.chan_size); + info!( + "subscription started id={}, key={:?}, addr={:?}", + sub_id, sub_key, remote_addr + ); + Subscriber { + id: sub_id, + key: sub_key, + sub_rx, + registry: self.clone(), + remote_addr, + } + } + + // Unregister the subscriber + pub fn unregister_subscriber(&self, subscriber: &Subscriber) { + self.shared_state + .write() + .unregister_subscriber(subscriber.key); + info!( + "subscription ended id={}, key={:?}, addr={:?}", + subscriber.id, subscriber.key, subscriber.remote_addr + ); + } +} + +// Private subscriber state. +struct Subscriber { + id: SubId, + key: SubscriptionKey, + // Subscriber receives messages from publishers here. + sub_rx: broadcast::Receiver, + // to unregister itself from shared state in Drop + registry: Registry, + // for logging + remote_addr: SocketAddr, +} + +impl Drop for Subscriber { + fn drop(&mut self) { + self.registry.unregister_subscriber(self); + } +} + +// Private publisher state +struct Publisher { + id: PubId, + registry: Registry, + // for logging + remote_addr: SocketAddr, +} + +impl Publisher { + // Send msg to relevant subscribers. + pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> { + // send message to subscribers for everything + let shared_state = self.registry.shared_state.read(); + // Err means there is no subscribers, it is fine. + shared_state.chan_to_all_subs.send(msg.clone()).ok(); + + // send message to per timeline subscribers + let ttid = + parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| { + Status::new(Code::InvalidArgument, "missing tenant_timeline_id") + })?)?; + if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) { + // Err can't happen here, as tx is destroyed only after removing + // from the map the last subscriber along with tx. + subs.chan + .send(msg.clone()) + .expect("rx is still in the map with zero subscribers"); + } + Ok(()) + } +} + +impl Drop for Publisher { + fn drop(&mut self) { + self.registry.unregister_publisher(self); + } +} + +struct Broker { + registry: Registry, +} + +#[tonic::async_trait] +impl BrokerService for Broker { + async fn publish_safekeeper_info( + &self, + request: Request>, + ) -> Result, Status> { + let remote_addr = request + .remote_addr() + .expect("TCPConnectInfo inserted by handler"); + let mut publisher = self.registry.register_publisher(remote_addr); + + let mut stream = request.into_inner(); + + loop { + match stream.next().await { + Some(Ok(msg)) => publisher.send_msg(&msg)?, + Some(Err(e)) => return Err(e), // grpc error from the stream + None => break, // closed stream + } + } + + Ok(Response::new(())) + } + + type SubscribeSafekeeperInfoStream = + Pin> + Send + 'static>>; + + async fn subscribe_safekeeper_info( + &self, + request: Request, + ) -> Result, Status> { + let remote_addr = request + .remote_addr() + .expect("TCPConnectInfo inserted by handler"); + let proto_key = request + .into_inner() + .subscription_key + .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?; + let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?; + let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr); + + // transform rx into stream with item = Result, as method result demands + let output = async_stream::try_stream! { + let mut warn_interval = time::interval(Duration::from_millis(1000)); + let mut missed_msgs: u64 = 0; + loop { + match subscriber.sub_rx.recv().await { + Ok(info) => yield info, + Err(RecvError::Lagged(skipped_msg)) => { + missed_msgs += skipped_msg; + if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) { + warn!("dropped {} messages, channel is full", missed_msgs); + missed_msgs = 0; + } + } + Err(RecvError::Closed) => { + // can't happen, we never drop the channel while there is a subscriber + Err(Status::new(Code::Internal, "channel unexpectantly closed"))?; + } + } + } + }; + + Ok(Response::new( + Box::pin(output) as Self::SubscribeSafekeeperInfoStream + )) + } +} + +// We serve only metrics and healthcheck through http1. +async fn http1_handler( + req: hyper::Request, +) -> Result, Infallible> { + let resp = match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + let mut buffer = vec![]; + let metrics = metrics::gather(); + let encoder = TextEncoder::new(); + encoder.encode(&metrics, &mut buffer).unwrap(); + + hyper::Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } + (&Method::GET, "/status") => hyper::Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap(), + _ => hyper::Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap(), + }; + Ok(resp) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + logging::init(LogFormat::from_config(&args.log_format)?)?; + info!("version: {GIT_VERSION}"); + + let registry = Registry { + shared_state: Arc::new(RwLock::new(SharedState::new(args.chan_size))), + chan_size: args.chan_size, + }; + let storage_broker_impl = Broker { + registry: registry.clone(), + }; + let storage_broker_server = BrokerServiceServer::new(storage_broker_impl); + + info!("listening on {}", &args.listen_addr); + + // grpc is served along with http1 for metrics on a single port, hence we + // don't use tonic's Server. + hyper::Server::bind(&args.listen_addr) + .http2_keep_alive_interval(Some(args.http2_keepalive_interval)) + .serve(make_service_fn(move |conn: &AddrStream| { + let storage_broker_server_cloned = storage_broker_server.clone(); + let connect_info = conn.connect_info(); + async move { + Ok::<_, Infallible>(service_fn(move |mut req| { + // That's what tonic's MakeSvc.call does to pass conninfo to + // the request handler (and where its request.remote_addr() + // expects it to find). + req.extensions_mut().insert(connect_info.clone()); + + // Technically this second clone is not needed, but consume + // by async block is apparently unavoidable. BTW, error + // message is enigmatic, see + // https://github.com/rust-lang/rust/issues/68119 + // + // We could get away without async block at all, but then we + // need to resort to futures::Either to merge the result, + // which doesn't caress an eye as well. + let mut storage_broker_server_svc = storage_broker_server_cloned.clone(); + async move { + if req.headers().get("content-type").map(|x| x.as_bytes()) + == Some(b"application/grpc") + { + let res_resp = storage_broker_server_svc.call(req).await; + // Grpc and http1 handlers have slightly different + // Response types: it is UnsyncBoxBody for the + // former one (not sure why) and plain hyper::Body + // for the latter. Both implement HttpBody though, + // and EitherBody is used to merge them. + res_resp.map(|resp| resp.map(EitherBody::Left)) + } else { + let res_resp = http1_handler(req).await; + res_resp.map(|resp| resp.map(EitherBody::Right)) + } + } + })) + } + })) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; + use tokio::sync::broadcast::error::TryRecvError; + use utils::id::{TenantId, TimelineId}; + + fn msg(timeline_id: Vec) -> SafekeeperTimelineInfo { + SafekeeperTimelineInfo { + safekeeper_id: 1, + tenant_timeline_id: Some(ProtoTenantTimelineId { + tenant_id: vec![0x00; 16], + timeline_id, + }), + last_log_term: 0, + flush_lsn: 1, + commit_lsn: 2, + backup_lsn: 3, + remote_consistent_lsn: 4, + peer_horizon_lsn: 5, + safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(), + local_start_lsn: 0, + } + } + + fn tli_from_u64(i: u64) -> Vec { + let mut timeline_id = vec![0xFF; 8]; + timeline_id.extend_from_slice(&i.to_be_bytes()); + timeline_id + } + + fn mock_addr() -> SocketAddr { + "127.0.0.1:8080".parse().unwrap() + } + + #[tokio::test] + async fn test_registry() { + let registry = Registry { + shared_state: Arc::new(RwLock::new(SharedState::new(16))), + chan_size: 16, + }; + + // subscribe to timeline 2 + let ttid_2 = TenantTimelineId { + tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(), + timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(), + }; + let sub_key_2 = SubscriptionKey::Timeline(ttid_2); + let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr()); + let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr()); + + // send two messages with different keys + let msg_1 = msg(tli_from_u64(1)); + let msg_2 = msg(tli_from_u64(2)); + let mut publisher = registry.register_publisher(mock_addr()); + publisher.send_msg(&msg_1).expect("failed to send msg"); + publisher.send_msg(&msg_2).expect("failed to send msg"); + + // msg with key 2 should arrive to subscriber_2 + assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2); + + // but nothing more + assert_eq!( + subscriber_2.sub_rx.try_recv().unwrap_err(), + TryRecvError::Empty + ); + + // subscriber_all should receive both messages + assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1); + assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2); + assert_eq!( + subscriber_all.sub_rx.try_recv().unwrap_err(), + TryRecvError::Empty + ); + } +} diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs new file mode 100644 index 0000000000..39e72ca721 --- /dev/null +++ b/storage_broker/src/lib.rs @@ -0,0 +1,108 @@ +use hyper::body::HttpBody; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tonic::codegen::StdError; +use tonic::{transport::Channel, Code, Status}; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +use proto::{ + broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId, +}; + +// Code generated by protobuf. +pub mod proto { + include!("../proto/storage_broker.rs"); +} + +pub mod metrics; + +// Re-exports to avoid direct tonic dependency in user crates. +pub use tonic::Request; +pub use tonic::Streaming; + +pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; + +// NeonBrokerClient charged with tonic provided Channel transport; helps to +// avoid depending on tonic directly in user crates. +pub type BrokerClientChannel = BrokerServiceClient; + +impl BrokerClientChannel { + /// Create a new client to the given endpoint, but don't actually connect until the first request. + pub async fn connect_lazy(dst: D) -> Result + where + D: std::convert::TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy(); + Ok(Self::new(conn)) + } +} + +// parse variable length bytes from protobuf +pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result { + let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id) + .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?; + let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| { + Status::new( + Code::InvalidArgument, + format!("malformed timeline_id: {}", e), + ) + })?; + Ok(TenantTimelineId { + tenant_id, + timeline_id, + }) +} + +// These several usages don't justify anyhow dependency, though it would work as +// well. +type AnyError = Box; + +// Provides impl HttpBody for two different types implementing it. Inspired by +// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs +pub enum EitherBody { + Left(A), + Right(B), +} + +impl HttpBody for EitherBody +where + A: HttpBody + Send + Unpin, + B: HttpBody + Send + Unpin, + A::Error: Into, + B::Error: Into, +{ + type Data = A::Data; + type Error = Box; + + fn is_end_stream(&self) -> bool { + match self { + EitherBody::Left(b) => b.is_end_stream(), + EitherBody::Right(b) => b.is_end_stream(), + } + } + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err), + EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + } + } +} + +fn map_option_err>(err: Option>) -> Option> { + err.map(|e| e.map_err(Into::into)) +} diff --git a/storage_broker/src/metrics.rs b/storage_broker/src/metrics.rs new file mode 100644 index 0000000000..f0649d0f68 --- /dev/null +++ b/storage_broker/src/metrics.rs @@ -0,0 +1,25 @@ +//! Broker metrics. + +use metrics::{register_int_gauge, IntGauge}; +use once_cell::sync::Lazy; + +pub static NUM_PUBS: Lazy = Lazy::new(|| { + register_int_gauge!("storage_broker_active_publishers", "Number of publications") + .expect("Failed to register metric") +}); + +pub static NUM_SUBS_TIMELINE: Lazy = Lazy::new(|| { + register_int_gauge!( + "storage_broker_per_timeline_active_subscribers", + "Number of subsciptions to particular tenant timeline id" + ) + .expect("Failed to register metric") +}); + +pub static NUM_SUBS_ALL: Lazy = Lazy::new(|| { + register_int_gauge!( + "storage_broker_all_keys_active_subscribers", + "Number of subsciptions to all keys" + ) + .expect("Failed to register metric") +}); diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 2daa08c9b6..00c590a0ca 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -17,13 +17,13 @@ ahash = { version = "0.7", features = ["std"] } anyhow = { version = "1", features = ["backtrace", "std"] } bytes = { version = "1", features = ["serde", "std"] } chrono = { version = "0.4", features = ["clock", "iana-time-zone", "js-sys", "oldtime", "serde", "std", "time", "wasm-bindgen", "wasmbind", "winapi"] } -clap = { version = "4", features = ["color", "error-context", "help", "std", "string", "suggestions", "usage"] } +clap = { version = "4", features = ["color", "derive", "error-context", "help", "std", "string", "suggestions", "usage"] } crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] } either = { version = "1", features = ["use_std"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] } futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] } -futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } +futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] } @@ -33,7 +33,8 @@ nom = { version = "7", features = ["alloc", "std"] } num-bigint = { version = "0.4", features = ["std"] } num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm", "std"] } -prost = { version = "0.10", features = ["prost-derive", "std"] } +prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] } +prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } @@ -58,7 +59,8 @@ libc = { version = "0.2", features = ["extra_traits", "std"] } log = { version = "0.4", default-features = false, features = ["serde", "std"] } memchr = { version = "2", features = ["std"] } nom = { version = "7", features = ["alloc", "std"] } -prost = { version = "0.10", features = ["prost-derive", "std"] } +prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] } +prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }