diff --git a/Cargo.lock b/Cargo.lock index 7659be6c92..9d98206247 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,13 +452,28 @@ checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750" dependencies = [ "atty", "bitflags", + "clap_derive", "clap_lex", "indexmap", + "once_cell", "strsim", "termcolor", "textwrap", ] +[[package]] +name = "clap_derive" +version = "3.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -789,9 +804,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19f39818dcfc97d45b03953c1292efc4e80954e1583c4aa770bac1383e2310a4" +checksum = "3f83d0ebf42c6eafb8d7c52f7e5f2d3003b89c7aa4fd2b79229209459a849af8" dependencies = [ "cc", "cxxbridge-flags", @@ -801,9 +816,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e580d70777c116df50c390d1211993f62d40302881e54d4b79727acb83d0199" +checksum = "07d050484b55975889284352b0ffc2ecbda25c0c55978017c132b29ba0818a86" dependencies = [ "cc", "codespan-reporting", @@ -816,15 +831,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56a46460b88d1cec95112c8c363f0e2c39afdb237f60583b0b36343bf627ea9c" +checksum = "99d2199b00553eda8012dfec8d3b1c75fce747cf27c169a270b3b99e3448ab78" [[package]] name = "cxxbridge-macro" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747b608fecf06b0d72d440f27acc99288207324b793be2c17991839f3d4995ea" +checksum = "dcb67a6de1f602736dd7eaead0080cf3435df806c61b24b13328db128c58868f" dependencies = [ "proc-macro2", "quote", @@ -1002,11 +1017,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fb8664f6ea68aba5503d42dd1be786b0f1bd9b7972e7f40208c83ef74db91bf" dependencies = [ "http", - "prost", + "prost 0.10.4", "tokio", "tokio-stream", - "tonic", - "tonic-build", + "tonic 0.7.2", + "tonic-build 0.7.2", "tower", "tower-service", ] @@ -1516,9 +1531,9 @@ dependencies = [ [[package]] name = "iana-time-zone-haiku" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde6edd6cef363e9359ed3c98ba64590ba9eecba2293eb5a723ab32aee8926aa" +checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" dependencies = [ "cxx", "cxx-build", @@ -1644,9 +1659,9 @@ dependencies = [ [[package]] name = "kqueue" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" +checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98" dependencies = [ "kqueue-sys", "libc", @@ -1872,6 +1887,22 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae" +[[package]] +name = "neon_broker" +version = "0.1.0" +dependencies = [ + "async-stream", + "clap", + "futures-core", + "futures-util", + "prost 0.11.0", + "tokio", + "tokio-stream", + "tonic 0.8.2", + "tonic-build 0.8.2", + "utils", +] + [[package]] name = "nix" version = "0.23.1" @@ -2416,14 +2447,38 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "prettyplease" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fead41e178796ef8274dc612a7d8ce4c7e10ca35cd2c5b5ad24cac63aeb6c0" +checksum = "c142c0e46b57171fe0c528bee8c5b7569e80f0c17e377cd0e30ea57dbc11bb51" dependencies = [ "proc-macro2", "syn", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -2432,9 +2487,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.46" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" dependencies = [ "unicode-ident", ] @@ -2475,7 +2530,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71adf41db68aa0daaefc69bb30bcd68ded9b9abaad5d1fbb6304c4fb390e083e" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.10.1", +] + +[[package]] +name = "prost" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" +dependencies = [ + "bytes", + "prost-derive 0.11.0", ] [[package]] @@ -2493,8 +2558,28 @@ dependencies = [ "log", "multimap", "petgraph", - "prost", - "prost-types", + "prost 0.10.4", + "prost-types 0.10.1", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-build" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost 0.11.0", + "prost-types 0.11.1", "regex", "tempfile", "which", @@ -2513,6 +2598,19 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-types" version = "0.10.1" @@ -2520,7 +2618,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d0a014229361011dc8e69c8a1ec6c2e8d0f2af7c91e3ea3f5b2170298461e68" dependencies = [ "bytes", - "prost", + "prost 0.10.4", +] + +[[package]] +name = "prost-types" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" +dependencies = [ + "bytes", + "prost 0.11.0", ] [[package]] @@ -3755,8 +3863,40 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.10.4", + "prost-derive 0.10.1", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55b9af819e54b8f33d453655bef9b9acc171568fb49523078d0cc4e7484200ec" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.0", + "prost-derive 0.11.0", "tokio", "tokio-stream", "tokio-util", @@ -3775,7 +3915,20 @@ checksum = "d9263bf4c9bfaae7317c1c2faf7f18491d2fe476f70c414b73bf5d445b00ffa1" dependencies = [ "prettyplease", "proc-macro2", - "prost-build", + "prost-build 0.10.4", + "quote", + "syn", +] + +[[package]] +name = "tonic-build" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c6fd7c2581e36d63388a9e04c350c21beb7a8b059580b2e93993c526899ddc" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build 0.11.1", "quote", "syn", ] @@ -4311,7 +4464,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", - "prost", + "prost 0.10.4", "rand", "regex", "regex-syntax", diff --git a/Cargo.toml b/Cargo.toml index 32c243bf44..0e6d8101cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ cargo-features = ["named-profiles"] [workspace] members = [ + "broker", "compute_tools", "control_plane", "pageserver", diff --git a/broker/Cargo.lock b/broker/Cargo.lock new file mode 100644 index 0000000000..ccf17d9d93 --- /dev/null +++ b/broker/Cargo.lock @@ -0,0 +1,1120 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "anyhow" +version = "1.0.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1485d4d2cc45e7b201ee3767015c96faa5904387c9d87c6efdd0fb511f12d305" + +[[package]] +name = "async-stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-trait" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "axum" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de18bc5f2e9df8f52da03856bf40e29b747de5a84e43aefff90e3dc4a21529b" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4f44a0e6200e9d11a1cdc989e4b358f6e3d354fbf48478f345a17f4e43f8635" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "3.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e724a68d9319343bb3328c9cc2dfde263f4b3142ee1059a9980580171c954b" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "clap_lex", + "indexmap", + "once_cell", + "strsim", + "termcolor", + "textwrap", +] + +[[package]] +name = "clap_derive" +version = "3.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13547f7012c01ab4a0e8f8967730ada8f9fdf419e8b6c792788f39cf4e46eefa" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +dependencies = [ + "os_str_bytes", +] + +[[package]] +name = "either" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" + +[[package]] +name = "fastrand" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499" +dependencies = [ + "instant", +] + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures-channel" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" + +[[package]] +name = "futures-macro" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765" + +[[package]] +name = "futures-task" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306" + +[[package]] +name = "futures-util" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "h2" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + +[[package]] +name = "httparse" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + +[[package]] +name = "indexmap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mio" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys", +] + +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" + +[[package]] +name = "os_str_bytes" +version = "6.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "petgraph" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +dependencies = [ + "fixedbitset", + "indexmap", +] + +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + +[[package]] +name = "prettyplease" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "697ae720ee02011f439e0701db107ffe2916d83f718342d65d7f8bf7b8a5fee9" +dependencies = [ + "proc-macro2", + "syn", +] + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro2" +version = "1.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "prost" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost", + "prost-types", + "regex", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" +dependencies = [ + "bytes", + "prost", +] + +[[package]] +name = "quote" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +dependencies = [ + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" + +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + +[[package]] +name = "serde" +version = "1.0.143" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53e8e5d5b70924f74ff5c6d64d9a5acd91422117c60f48c4e07855238a254553" + +[[package]] +name = "slab" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] +name = "syn" +version = "1.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + +[[package]] +name = "tempfile" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +dependencies = [ + "cfg-if", + "fastrand", + "libc", + "redox_syscall", + "remove_dir_all", + "winapi", +] + +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" + +[[package]] +name = "tokio" +version = "1.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" +dependencies = [ + "autocfg", + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "pin-project-lite", + "socket2", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498f271adc46acce75d66f639e4d35b31b2394c295c82496727dafa16d465dd2" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + +[[package]] +name = "tonic-build" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fbcd2800e34e743b9ae795867d5f77b535d3a3be69fd731e39145719752df8c" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + +[[package]] +name = "tonic_bench" +version = "0.1.0" +dependencies = [ + "async-stream", + "clap", + "futures-core", + "futures-util", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +dependencies = [ + "once_cell", +] + +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicode-ident" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "which" +version = "4.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" +dependencies = [ + "either", + "lazy_static", + "libc", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/broker/Cargo.toml b/broker/Cargo.toml new file mode 100644 index 0000000000..b10ad110a1 --- /dev/null +++ b/broker/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "neon_broker" +version = "0.1.0" +edition = "2021" + +[features] +bench = [] + +[[bin]] +name = "neon_broker" +path = "src/broker.rs" + +[[bin]] +name = "neon_broker_bench" +path = "src/bench.rs" +# build benchmarking binary only if explicitly requested with '--feature bench' +# required-features = ["bench"] + +[dependencies] +async-stream = "0.3" +futures-core = "0.3" +futures-util = "0.3" +tonic = "0.8" +prost = "0.11" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +# for exploring with tokio-console +# tokio = { version = "1", features = ["full", "tracing"] } +# console-subscriber = "0.1.8" +tokio-stream = "0.1" +clap = { version = "3.2.17", features = ["derive"] } + +utils = { path = "../libs/utils" } + +[build-dependencies] +tonic-build = "0.8" diff --git a/broker/build.rs b/broker/build.rs new file mode 100644 index 0000000000..d7d1e67270 --- /dev/null +++ b/broker/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/broker.proto")?; + Ok(()) +} diff --git a/broker/proto/broker.proto b/broker/proto/broker.proto new file mode 100644 index 0000000000..10729c9862 --- /dev/null +++ b/broker/proto/broker.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package neon_broker; + +service NeonBroker { + // Subscribe to safekeeper updates. + rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {}; + + // Publish safekeeper updates. + rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (Empty) {}; +} + +message SubscribeSafekeeperInfoRequest { + oneof subscription_key { + Empty all = 1; // subscribe to everything + TenantTimelineId tenant_timeline_id = 2; // subscribe to specific timeline + } +} + +message SafekeeperTimelineInfo { + uint64 safekeeper_id = 1; + TenantTimelineId tenant_timeline_id = 2; + uint64 last_log_term = 3; + uint64 flush_lsn = 4; + uint64 commit_lsn = 5; + uint64 backup_lsn = 6; + uint64 remote_consistent_lsn = 7; + uint64 peer_horizon_lsn = 8; + string safekeeper_connstr = 9; +} + +message TenantTimelineId { + bytes tenant_id = 1; + bytes timeline_id = 2; +} + +message Empty { +} \ No newline at end of file diff --git a/broker/readme.md b/broker/readme.md new file mode 100644 index 0000000000..5d85d20687 --- /dev/null +++ b/broker/readme.md @@ -0,0 +1,4 @@ +``` +cargo build -r -p neon_broker --features bench && target/release/neon_broker +target/release/neon_broker_bench -s 1 -p 1 +``` \ No newline at end of file diff --git a/broker/src/bench.rs b/broker/src/bench.rs new file mode 100644 index 0000000000..79018e17c5 --- /dev/null +++ b/broker/src/bench.rs @@ -0,0 +1,179 @@ +pub mod neon_broker { + tonic::include_proto!("neon_broker"); +} + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use clap::Parser; +use neon_broker::neon_broker_client::NeonBrokerClient; +use neon_broker::subscribe_safekeeper_info_request::SubscriptionKey; +use neon_broker::TenantTimelineId as ProtoTenantTimelineId; +use neon_broker::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; +use tokio::time::{self, sleep}; + +use tonic::transport::Channel; +use tonic::Request; + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + /// Number of publishers + #[clap(short = 'p', long, value_parser, default_value_t = 1)] + num_pubs: u64, + /// Number of subscribers + #[clap(short = 's', long, value_parser, default_value_t = 1)] + num_subs: u64, +} + +async fn progress_reporter(counters: Vec>) { + let mut interval = time::interval(Duration::from_millis(1000)); + let mut c_old = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum(); + let mut c_min_old = counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .min() + .unwrap_or(0); + let mut started_at = None; + let mut skipped: u64 = 0; + loop { + interval.tick().await; + // print!( + // "cnts are {:?}", + // counters + // .iter() + // .map(|c| c.load(Ordering::Relaxed)) + // .collect::>() + // ); + let c_new = counters.iter().map(|c| c.load(Ordering::Relaxed)).sum(); + let c_min_new = counters + .iter() + .map(|c| c.load(Ordering::Relaxed)) + .min() + .unwrap_or(0); + if c_new > 0 && started_at.is_none() { + started_at = Some(Instant::now()); + skipped = c_new; + } + let avg_rps = started_at.map(|s| { + let dur = s.elapsed(); + let dur_secs = dur.as_secs() as f64 + (dur.subsec_millis() as f64) / 1000.0; + let avg_rps = (c_new - skipped) as f64 / dur_secs; + (dur, avg_rps) + }); + println!( + "sum rps {}, min rps {} total {}, total min {}, duration, avg sum rps {:?}", + c_new - c_old, + c_min_new - c_min_old, + c_new, + c_min_new, + avg_rps + ); + c_old = c_new; + c_min_old = c_min_new; + } +} + +fn tli_from_u64(i: u64) -> Vec { + let mut timeline_id = vec![0xFF; 8]; + timeline_id.extend_from_slice(&i.to_be_bytes()); + timeline_id +} + +async fn subscribe(client: Option>, counter: Arc, i: u64) { + let mut client = match client { + Some(c) => c, + None => NeonBrokerClient::connect("http://[::1]:50051") + .await + .unwrap(), + }; + + // let key = SubscriptionKey::All(Empty {}); + let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId { + tenant_id: vec![0xFF; 16], + timeline_id: tli_from_u64(i), + }); + let request = SubscribeSafekeeperInfoRequest { + subscription_key: Some(key), + }; + let mut stream = client + .subscribe_safekeeper_info(request) + .await + .unwrap() + .into_inner(); + + while let Some(_feature) = stream.message().await.unwrap() { + counter.fetch_add(1, Ordering::Relaxed); + // println!("info = {:?}, client {}", _feature, i); + } +} + +async fn publish(client: Option>, n_keys: u64) { + let mut client = match client { + Some(c) => c, + None => NeonBrokerClient::connect("http://[::1]:50051") + .await + .unwrap(), + }; + let mut counter: u64 = 0; + + // create stream producing new values + let outbound = async_stream::stream! { + loop { + let info = SafekeeperTimelineInfo { + safekeeper_id: 1, + tenant_timeline_id: Some(ProtoTenantTimelineId { + tenant_id: vec![0xFF; 16], + timeline_id: tli_from_u64(counter % n_keys), + }), + last_log_term: 0, + flush_lsn: counter, + commit_lsn: 2, + backup_lsn: 3, + remote_consistent_lsn: 4, + peer_horizon_lsn: 5, + safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(), + }; + counter += 1; + // println!("sending info = {:?}", info); + // if counter >= 1000 { + // break; + // } + yield info; + // sleep(Duration::from_millis(100)).await; + } + }; + let _response = client + .publish_safekeeper_info(Request::new(outbound)) + .await + .unwrap(); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + let mut counters = Vec::with_capacity(args.num_subs as usize); + for _ in 0..args.num_subs { + counters.push(Arc::new(AtomicU64::new(0))); + } + let h = tokio::spawn(progress_reporter(counters.clone())); + + let c = NeonBrokerClient::connect("http://[::1]:50051") + .await + .unwrap(); + + for i in 0..args.num_subs { + let c = Some(c.clone()); + // let c = None; + tokio::spawn(subscribe(c, counters[i as usize].clone(), i)); + } + for _i in 0..args.num_pubs { + // let c = Some(c.clone()); + tokio::spawn(publish(None, args.num_subs as u64)); + } + + h.await?; + Ok(()) +} diff --git a/broker/src/broker.rs b/broker/src/broker.rs new file mode 100644 index 0000000000..71f9988767 --- /dev/null +++ b/broker/src/broker.rs @@ -0,0 +1,526 @@ +//! Simple pub-sub based on grpc (tonic) and Tokio mpsc for storage nodes +//! messaging. The main design goal is to avoid central synchronization during +//! normal flow, resorting to it only when pub/sub change happens. Each +//! subscriber holds mpsc for messages it sits on; tx end is sent to existing +//! publishers and saved in shared state for new ones. Publishers maintain +//! locally set of subscribers they stream messages to. +//! +//! Subscriptions to 1) single timeline 2) everything are possible. We could add +//! subscription to set of timelines to save grpc streams, but testing shows +//! many individual streams is also ok. +//! +//! Message is dropped if subscriber can't consume it, not affecting other +//! subscribers. +//! +//! Only safekeeper message is supported, but it is not hard to add something +//! else with templating. +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures_core::Stream; +use futures_util::StreamExt; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::{select, time}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Code; +use tonic::{transport::Server, Request, Response, Status}; + +use neon_broker_proto::neon_broker_server::{NeonBroker, NeonBrokerServer}; +use neon_broker_proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; +use neon_broker_proto::TenantTimelineId as ProtoTenantTimelineId; +use neon_broker_proto::{Empty, SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest}; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +pub mod neon_broker_proto { + // The string specified here must match the proto package name. + // If you want to have a look at the generated code, it is at path similar to + // target/debug/build/neon_broker-0fde81d03bedc3b2/out/neon_broker.rs + tonic::include_proto!("neon_broker"); +} + +// Max size of the queue to the subscriber. +const CHAN_SIZE: usize = 256; + +type PubId = u64; // id of publisher for registering in maps +type SubId = u64; // id of subscriber for registering in maps + +#[derive(Copy, Clone)] +enum SubscriptionKey { + All, + Timeline(TenantTimelineId), +} + +impl SubscriptionKey { + // Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors). + pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result { + match key { + ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All), + ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => { + Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?)) + } + } + } +} + +// Subscriber id + tx end of the channel for messages to it. +#[derive(Clone)] +struct SubSender(SubId, Sender); + +impl fmt::Debug for SubSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Subscription id {}", self.0) + } +} + +// Announcements subscriber sends to publisher(s) asking it to stream to the +// provided channel, or forget about it, releasing memory. +#[derive(Clone)] +enum SubAnnounce { + AddAll(Sender), // add subscription to all timelines + AddTimeline(TenantTimelineId, SubSender), // add subsciption to the specific timeline + RemoveTimeline(TenantTimelineId, SubId), // remove subscription to the specific timeline + // RemoveAll is not needed as publisher will notice closed channel while + // trying to send the next message. +} + +struct SharedState { + // Registered publishers. They sit on the rx end of these channels and + // receive through it tx handles of chans to subscribers. + // + // Note: publishers don't identify which keys they publish, so each + // publisher will receive channels to all subs and filter them before sending. + pub_txs: HashMap>, + next_pub_id: PubId, + // Registered subscribers -- when publisher joins it walks over them, + // collecting txs to send messages. + subs_to_all: HashMap>, + subs_to_timelines: HashMap>, + next_sub_id: SubId, +} + +// Utility func to remove subscription from the map +fn remove_sub( + subs_to_timelines: &mut HashMap>, + ttid: &TenantTimelineId, + sub_id: SubId, +) { + if let Some(subsenders) = subs_to_timelines.get_mut(&ttid) { + subsenders.retain(|ss| ss.0 != sub_id); + if subsenders.len() == 0 { + subs_to_timelines.remove(&ttid); + } + } + // Note that subscription might be not here if subscriber task was aborted + // earlier than it managed to notify publisher about itself. +} + +impl SharedState { + // Register new publisher. + pub fn register_publisher(&mut self, announce_tx: Sender) -> PubId { + let pub_id = self.next_pub_id; + self.next_pub_id += 1; + assert!(!self.pub_txs.contains_key(&pub_id)); + self.pub_txs.insert(pub_id, announce_tx); + pub_id + } + + pub fn unregister_publisher(&mut self, pub_id: PubId) { + assert!(self.pub_txs.contains_key(&pub_id)); + self.pub_txs.remove(&pub_id); + } + + // Register new subscriber. + // Returns list of channels through which existing publishers must be notified + // about new subscriber; we can't do it here due to risk of deadlock. + pub fn register_subscriber( + &mut self, + sub_key: SubscriptionKey, + sub_tx: Sender, + ) -> (SubId, Vec>, SubAnnounce) { + let sub_id = self.next_sub_id; + self.next_sub_id += 1; + let announce = match sub_key { + SubscriptionKey::All => { + assert!(!self.subs_to_all.contains_key(&sub_id)); + self.subs_to_all.insert(sub_id, sub_tx.clone()); + SubAnnounce::AddAll(sub_tx) + } + SubscriptionKey::Timeline(ttid) => { + match self.subs_to_timelines.entry(ttid) { + Entry::Occupied(mut o) => { + let subsenders = o.get_mut(); + subsenders.push(SubSender(sub_id, sub_tx.clone())); + } + Entry::Vacant(v) => { + v.insert(vec![SubSender(sub_id, sub_tx.clone())]); + } + } + SubAnnounce::AddTimeline(ttid, SubSender(sub_id, sub_tx)) + } + }; + // Collect existing publishers to notify them after lock is released; + // TODO: the probability of channels being full here is tiny (publisher + // always blocks listening chan), we can try sending first and resort to + // cloning if needed. + // + // Deadlock is possible only if publisher tries to access shared state + // during its lifetime, i.e. we add maintenance of set of published + // tlis. Otherwise we can just await here (but lock must be replaced + // with Tokio one). + // + // We could also just error out if some chan is full, but that needs + // cleanup of incompleted job, and notifying publishers when unregistering + // is mandatory anyway. + (sub_id, self.pub_txs.values().cloned().collect(), announce) + } + + // Unregister the subscriber. Similar to register_subscriber, returns list + // of channels through which publishers must be notified about the removal. + pub fn unregister_subscriber( + &mut self, + sub_id: SubId, + sub_key: SubscriptionKey, + ) -> Option<(Vec>, SubAnnounce)> { + // We need to notify existing publishers only about per timeline + // subscriptions, 'all' kind is detected on its own through closed + // channels. + let announce = match sub_key { + SubscriptionKey::All => { + assert!(self.subs_to_all.contains_key(&sub_id)); + self.subs_to_all.remove(&sub_id); + None + } + SubscriptionKey::Timeline(ref ttid) => { + remove_sub(&mut self.subs_to_timelines, ttid, sub_id); + Some(SubAnnounce::RemoveTimeline(*ttid, sub_id)) + } + }; + announce.map(|a| (self.pub_txs.values().cloned().collect(), a)) + } + + pub fn report(&mut self) { + println!( + "registered {} publishers, {} subs to all, {} subs to timelines", + self.pub_txs.len(), + self.subs_to_all.len(), + self.subs_to_timelines.len(), + ); + } +} + +// SharedState wrapper for post-locking operations (sending to pub_tx chans). +#[derive(Clone)] +struct Registry { + shared_state: Arc>, +} + +impl Registry { + // Register new publisher in shared state. + pub fn register_publisher(&self) -> Publisher { + let (announce_tx, announce_rx) = mpsc::channel(128); + let mut ss = self.shared_state.lock().unwrap(); + let id = ss.register_publisher(announce_tx); + let (subs_to_all, subs_to_timelines) = ( + ss.subs_to_all.values().cloned().collect(), + ss.subs_to_timelines.clone(), + ); + drop(ss); + // println!("registered publisher {}", id); + Publisher { + id, + announce_rx: announce_rx.into(), + subs_to_all, + subs_to_timelines, + registry: self.clone(), + } + } + + pub fn unregister_publisher(&self, publisher: &Publisher) { + self.shared_state + .lock() + .unwrap() + .unregister_publisher(publisher.id); + // println!("unregistered publisher {}", publisher.id); + } + + // Register new subscriber in shared state. + pub async fn register_subscriber(&self, sub_key: SubscriptionKey) -> Subscriber { + let (tx, rx) = mpsc::channel(CHAN_SIZE); + let id; + let mut pub_txs; + let announce; + { + let mut ss = self.shared_state.lock().unwrap(); + (id, pub_txs, announce) = ss.register_subscriber(sub_key, tx); + } + // Note: it is important to create Subscriber before .await. If client + // disconnects during await, which would terminate the Future we still + // need to run Subscriber's drop() which will unregister it from the + // shared state. + let subscriber = Subscriber { + id, + key: sub_key, + sub_rx: rx, + registry: self.clone(), + }; + // Notify existing publishers about new subscriber. + for pub_tx in pub_txs.iter_mut() { + // Closed channel is fine; it means publisher has gone. + pub_tx.send(announce.clone()).await.ok(); + } + // println!("registered subscriber {}", id); + subscriber + } + + // Unregister the subscriber + pub fn unregister_subscriber(&self, sub: &Subscriber) { + let mut ss = self.shared_state.lock().unwrap(); + let announce_pack = ss.unregister_subscriber(sub.id, sub.key); + drop(ss); + // Notify publishers about the removal. Apart from wanting to do it + // outside lock, here we also spin a task as Drop impl can't be async. + if let Some((mut pub_txs, announce)) = announce_pack { + tokio::spawn(async move { + for pub_tx in pub_txs.iter_mut() { + // Closed channel is fine; it means publisher has gone. + pub_tx.send(announce.clone()).await.ok(); + } + }); + } + // println!("unregistered subscriber {}", sub.id); + } + + pub async fn report(&self) { + let mut interval = time::interval(Duration::from_millis(1000)); + loop { + interval.tick().await; + self.shared_state.lock().unwrap().report(); + } + } +} + +// Private subscriber state. +struct Subscriber { + id: SubId, + key: SubscriptionKey, + // Subscriber receives messages from publishers here. + sub_rx: Receiver, + // to unregister itself from shared state in Drop + registry: Registry, +} + +impl Drop for Subscriber { + fn drop(&mut self) { + self.registry.unregister_subscriber(self); + } +} + +// Private publisher state +struct Publisher { + id: PubId, + // new subscribers request to send (or stop sending) msgs them here. + // It could be just Receiver, but weirdly it doesn't implement futures_core Stream directly. + announce_rx: ReceiverStream, + subs_to_all: Vec>, + subs_to_timelines: HashMap>, + // to unregister itself from shared state in Drop + registry: Registry, +} + +impl Publisher { + // Send msg to relevant subscribers. + pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> { + // send message to subscribers for everything + let mut cleanup_subs_to_all = false; + for sub in self.subs_to_all.iter() { + match sub.try_send(msg.clone()) { + Err(TrySendError::Full(_)) => { + // println!("dropping message, channel is full"); + } + Err(TrySendError::Closed(_)) => { + cleanup_subs_to_all = true; + } + _ => (), + } + } + // some channels got closed (subscriber gone), remove them + if cleanup_subs_to_all { + self.subs_to_all.retain(|tx| !tx.is_closed()); + } + + // send message to per timeline subscribers + let ttid = parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or(Status::new( + Code::InvalidArgument, + "missing tenant_timeline_id", + ))?)?; + if let Some(subs) = self.subs_to_timelines.get(&ttid) { + for tx in subs.iter().map(|sub_sender| &sub_sender.1) { + if let Err(TrySendError::Full(_)) = tx.try_send(msg.clone()) { + // println!("dropping message, channel is full"); + } + // closed channel is ignored here; we will be notified and remove it soon + } + } + Ok(()) + } + + // Add/remove subscriber according to sub_announce. + pub fn update_sub(&mut self, sub_announce: SubAnnounce) { + match sub_announce { + SubAnnounce::AddAll(tx) => self.subs_to_all.push(tx), + SubAnnounce::AddTimeline(ttid, sub_sender) => { + match self.subs_to_timelines.entry(ttid) { + Entry::Occupied(mut o) => { + let subsenders = o.get_mut(); + subsenders.push(sub_sender); + } + Entry::Vacant(v) => { + v.insert(vec![sub_sender]); + } + } + } + SubAnnounce::RemoveTimeline(ref ttid, sub_id) => { + remove_sub(&mut self.subs_to_timelines, ttid, sub_id); + } + } + } +} + +impl Drop for Publisher { + fn drop(&mut self) { + self.registry.unregister_publisher(self); + } +} + +struct NeonBrokerService { + registry: Registry, +} + +#[tonic::async_trait] +impl NeonBroker for NeonBrokerService { + async fn publish_safekeeper_info( + &self, + request: Request>, + ) -> Result, Status> { + let mut publisher = self.registry.register_publisher(); + + let mut stream = request.into_inner(); + + loop { + select! { + msg = stream.next() => { + match msg { + Some(Ok(msg)) => {publisher.send_msg(&msg)?;}, + Some(Err(e)) => {return Err(e);}, // grpc error from the stream + None => {break;} // closed stream + } + } + Some(announce) = publisher.announce_rx.next() => { + publisher.update_sub(announce); + } + } + } + + Ok(Response::new(Empty {})) + } + + type SubscribeSafekeeperInfoStream = + Pin> + Send + 'static>>; + + async fn subscribe_safekeeper_info( + &self, + request: Request, + ) -> Result, Status> { + let proto_key = request.into_inner().subscription_key.ok_or(Status::new( + Code::InvalidArgument, + "missing subscription key", + ))?; + let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?; + let mut subscriber = self.registry.register_subscriber(sub_key).await; + + // transform rx into stream with item = Result, as method result demands + let output = async_stream::try_stream! { + while let Some(info) = subscriber.sub_rx.recv().await { + yield info + } + + // internal generator + // let _ = subscriber.sub_rx.try_recv().ok(); + // let mut counter = 0; + // loop { + // let info = SafekeeperTimelineInfo { + // safekeeper_id: 1, + // tenant_timeline_id: Some(ProtoTenantTimelineId { + // tenant_id: vec![0xFF; 16], + // timeline_id: vec![0xFF; 16], + // // timeline_id: tli_from_u64(counter), + // }), + // last_log_term: 0, + // flush_lsn: counter, + // commit_lsn: 2, + // backup_lsn: 3, + // remote_consistent_lsn: 4, + // peer_horizon_lsn: 5, + // safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(), + // }; + // counter += 1; + // yield info; + // } + }; + + Ok(Response::new( + Box::pin(output) as Self::SubscribeSafekeeperInfoStream + )) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // console_subscriber::init(); + + let addr = "[::1]:50051".parse()?; + let registry = Registry { + shared_state: Arc::new(Mutex::new(SharedState { + pub_txs: HashMap::new(), + next_pub_id: 0, + subs_to_all: HashMap::new(), + subs_to_timelines: HashMap::new(), + next_sub_id: 0, + })), + }; + let neon_broker_service = NeonBrokerService { + registry: registry.clone(), + }; + + tokio::spawn(async move { registry.report().await }); + + Server::builder() + .http2_keepalive_interval(Some(Duration::from_millis(5000))) + .add_service(NeonBrokerServer::new(neon_broker_service)) + .serve(addr) + .await?; + + Ok(()) +} + +// parse variable length bytes from protobuf +fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result { + let tenant_id = TenantId::from_vec(&proto_ttid.tenant_id) + .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?; + let timeline_id = TimelineId::from_vec(&proto_ttid.timeline_id).map_err(|e| { + Status::new( + Code::InvalidArgument, + format!("malformed timeline_id: {}", e), + ) + })?; + Ok(TenantTimelineId { + tenant_id, + timeline_id, + }) +} diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index f245f7c3d4..5ef4e06bd7 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -3,6 +3,13 @@ use std::{fmt, str::FromStr}; use hex::FromHex; use rand::Rng; use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum IdError { + #[error("invalid id length {0}")] + VecParseError(usize), +} /// Neon ID is a 128-bit random ID. /// Used to represent various identifiers. Provides handy utility methods and impls. @@ -22,6 +29,15 @@ impl Id { Id::from(arr) } + pub fn from_vec(src: &Vec) -> Result { + if src.len() != 16 { + return Err(IdError::VecParseError(src.len())); + } + let mut zid_slice = [0u8; 16]; + zid_slice.copy_from_slice(&src); + Ok(zid_slice.into()) + } + pub fn as_arr(&self) -> [u8; 16] { self.0 } @@ -100,6 +116,10 @@ macro_rules! id_newtype { $t(Id::get_from_buf(buf)) } + pub fn from_vec(src: &Vec) -> Result<$t, IdError> { + Ok($t(Id::from_vec(src)?)) + } + pub fn as_arr(&self) -> [u8; 16] { self.0.as_arr() }