mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
19 Commits
run-clippy
...
split-prox
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18303e4d68 | ||
|
|
3df6d368e3 | ||
|
|
b62e7c0138 | ||
|
|
a2968c6cf8 | ||
|
|
bae1288671 | ||
|
|
1254d8f56e | ||
|
|
073508493c | ||
|
|
7cb2349296 | ||
|
|
87151f9efd | ||
|
|
96fe084c57 | ||
|
|
20fdf3e19f | ||
|
|
c6b36d8171 | ||
|
|
0e8a848937 | ||
|
|
db4085fe22 | ||
|
|
0d895ba002 | ||
|
|
103f34e954 | ||
|
|
262378e561 | ||
|
|
9f38ab39c6 | ||
|
|
fa92328423 |
358
Cargo.lock
generated
358
Cargo.lock
generated
@@ -484,7 +484,7 @@ dependencies = [
|
||||
"http 0.2.9",
|
||||
"http 1.1.0",
|
||||
"once_cell",
|
||||
"p256",
|
||||
"p256 0.11.1",
|
||||
"percent-encoding",
|
||||
"ring 0.17.6",
|
||||
"sha2",
|
||||
@@ -848,6 +848,12 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
|
||||
|
||||
[[package]]
|
||||
name = "base16ct"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.13.1"
|
||||
@@ -971,9 +977,9 @@ checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck"
|
||||
version = "1.16.0"
|
||||
version = "1.16.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5"
|
||||
checksum = "102087e286b4677862ea56cf8fc58bb2cdfa8725c40ffb80fe3a008eb7f2fc83"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
@@ -1526,8 +1532,10 @@ version = "0.5.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
"rand_core 0.6.4",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1621,6 +1629,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
|
||||
dependencies = [
|
||||
"const-oid",
|
||||
"pem-rfc7468",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
@@ -1720,6 +1729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
|
||||
dependencies = [
|
||||
"block-buffer",
|
||||
"const-oid",
|
||||
"crypto-common",
|
||||
"subtle",
|
||||
]
|
||||
@@ -1771,11 +1781,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c"
|
||||
dependencies = [
|
||||
"der 0.6.1",
|
||||
"elliptic-curve",
|
||||
"rfc6979",
|
||||
"elliptic-curve 0.12.3",
|
||||
"rfc6979 0.3.1",
|
||||
"signature 1.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ecdsa"
|
||||
version = "0.16.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca"
|
||||
dependencies = [
|
||||
"der 0.7.8",
|
||||
"digest",
|
||||
"elliptic-curve 0.13.8",
|
||||
"rfc6979 0.4.0",
|
||||
"signature 2.2.0",
|
||||
"spki 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.8.1"
|
||||
@@ -1788,16 +1812,36 @@ version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3"
|
||||
dependencies = [
|
||||
"base16ct",
|
||||
"base16ct 0.1.1",
|
||||
"crypto-bigint 0.4.9",
|
||||
"der 0.6.1",
|
||||
"digest",
|
||||
"ff",
|
||||
"ff 0.12.1",
|
||||
"generic-array",
|
||||
"group",
|
||||
"pkcs8",
|
||||
"group 0.12.1",
|
||||
"pkcs8 0.9.0",
|
||||
"rand_core 0.6.4",
|
||||
"sec1",
|
||||
"sec1 0.3.0",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "elliptic-curve"
|
||||
version = "0.13.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47"
|
||||
dependencies = [
|
||||
"base16ct 0.2.0",
|
||||
"crypto-bigint 0.5.5",
|
||||
"digest",
|
||||
"ff 0.13.0",
|
||||
"generic-array",
|
||||
"group 0.13.0",
|
||||
"pem-rfc7468",
|
||||
"pkcs8 0.10.2",
|
||||
"rand_core 0.6.4",
|
||||
"sec1 0.7.3",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -1951,6 +1995,16 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ff"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449"
|
||||
dependencies = [
|
||||
"rand_core 0.6.4",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "filetime"
|
||||
version = "0.2.22"
|
||||
@@ -2148,6 +2202,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
|
||||
dependencies = [
|
||||
"typenum",
|
||||
"version_check",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2214,7 +2269,18 @@ version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7"
|
||||
dependencies = [
|
||||
"ff",
|
||||
"ff 0.12.1",
|
||||
"rand_core 0.6.4",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "group"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63"
|
||||
dependencies = [
|
||||
"ff 0.13.0",
|
||||
"rand_core 0.6.4",
|
||||
"subtle",
|
||||
]
|
||||
@@ -2776,6 +2842,42 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jose-b64"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bec69375368709666b21c76965ce67549f2d2db7605f1f8707d17c9656801b56"
|
||||
dependencies = [
|
||||
"base64ct",
|
||||
"serde",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jose-jwa"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ab78e053fe886a351d67cf0d194c000f9d0dcb92906eb34d853d7e758a4b3a7"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jose-jwk"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "280fa263807fe0782ecb6f2baadc28dffc04e00558a58e33bfdb801d11fd58e7"
|
||||
dependencies = [
|
||||
"jose-b64",
|
||||
"jose-jwa",
|
||||
"p256 0.13.2",
|
||||
"p384",
|
||||
"rsa",
|
||||
"serde",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.69"
|
||||
@@ -2835,6 +2937,9 @@ name = "lazy_static"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
dependencies = [
|
||||
"spin 0.5.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazycell"
|
||||
@@ -3204,6 +3309,23 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint-dig"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"lazy_static",
|
||||
"libm",
|
||||
"num-integer",
|
||||
"num-iter",
|
||||
"num-traits",
|
||||
"rand 0.8.5",
|
||||
"smallvec",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-complex"
|
||||
version = "0.4.4"
|
||||
@@ -3481,11 +3603,33 @@ version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594"
|
||||
dependencies = [
|
||||
"ecdsa",
|
||||
"elliptic-curve",
|
||||
"ecdsa 0.14.8",
|
||||
"elliptic-curve 0.12.3",
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "p256"
|
||||
version = "0.13.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b"
|
||||
dependencies = [
|
||||
"ecdsa 0.16.9",
|
||||
"elliptic-curve 0.13.8",
|
||||
"primeorder",
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "p384"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209"
|
||||
dependencies = [
|
||||
"elliptic-curve 0.13.8",
|
||||
"primeorder",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
@@ -3847,6 +3991,15 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem-rfc7468"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412"
|
||||
dependencies = [
|
||||
"base64ct",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.2.0"
|
||||
@@ -3863,6 +4016,29 @@ dependencies = [
|
||||
"indexmap 1.9.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pg_sni_router"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"futures",
|
||||
"git-version",
|
||||
"itertools 0.10.5",
|
||||
"pq_proto",
|
||||
"proxy-core",
|
||||
"proxy-sasl",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"socket2 0.5.5",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "phf"
|
||||
version = "0.11.1"
|
||||
@@ -3913,6 +4089,17 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pkcs1"
|
||||
version = "0.7.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f"
|
||||
dependencies = [
|
||||
"der 0.7.8",
|
||||
"pkcs8 0.10.2",
|
||||
"spki 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pkcs8"
|
||||
version = "0.9.0"
|
||||
@@ -3923,6 +4110,16 @@ dependencies = [
|
||||
"spki 0.6.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pkcs8"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
|
||||
dependencies = [
|
||||
"der 0.7.8",
|
||||
"spki 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pkg-config"
|
||||
version = "0.3.27"
|
||||
@@ -4116,6 +4313,15 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "primeorder"
|
||||
version = "0.13.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6"
|
||||
dependencies = [
|
||||
"elliptic-curve 0.13.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.20+deprecated"
|
||||
@@ -4230,9 +4436,38 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"aws-config",
|
||||
"clap",
|
||||
"futures",
|
||||
"git-version",
|
||||
"humantime",
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"pq_proto",
|
||||
"proxy-core",
|
||||
"proxy-sasl",
|
||||
"remote_storage",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"socket2 0.5.5",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proxy-core"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-trait",
|
||||
"atomic-take",
|
||||
@@ -4250,11 +4485,11 @@ dependencies = [
|
||||
"consumption_metrics",
|
||||
"crossbeam-deque",
|
||||
"dashmap",
|
||||
"ecdsa 0.16.9",
|
||||
"env_logger",
|
||||
"fallible-iterator",
|
||||
"framed-websockets",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hashbrown 0.14.5",
|
||||
"hashlink",
|
||||
"hex",
|
||||
@@ -4270,12 +4505,14 @@ dependencies = [
|
||||
"indexmap 2.0.1",
|
||||
"ipnet",
|
||||
"itertools 0.10.5",
|
||||
"jose-jwa",
|
||||
"jose-jwk",
|
||||
"lasso",
|
||||
"md5",
|
||||
"measured",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"p256 0.13.2",
|
||||
"parking_lot 0.12.1",
|
||||
"parquet",
|
||||
"parquet_derive",
|
||||
@@ -4284,7 +4521,7 @@ dependencies = [
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"pq_proto",
|
||||
"prometheus",
|
||||
"proxy-sasl",
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"rcgen",
|
||||
@@ -4296,6 +4533,7 @@ dependencies = [
|
||||
"reqwest-retry",
|
||||
"reqwest-tracing",
|
||||
"routerify",
|
||||
"rsa",
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls 0.22.4",
|
||||
@@ -4305,6 +4543,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"signature 2.2.0",
|
||||
"smallvec",
|
||||
"smol_str",
|
||||
"socket2 0.5.5",
|
||||
@@ -4312,7 +4551,6 @@ dependencies = [
|
||||
"task-local-extensions",
|
||||
"thiserror",
|
||||
"tikv-jemalloc-ctl",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
@@ -4335,6 +4573,35 @@ dependencies = [
|
||||
"x509-parser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proxy-sasl"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"crossbeam-deque",
|
||||
"hmac",
|
||||
"itertools 0.10.5",
|
||||
"lasso",
|
||||
"measured",
|
||||
"parking_lot 0.12.1",
|
||||
"pbkdf2",
|
||||
"postgres-protocol",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"rustls 0.22.4",
|
||||
"sha2",
|
||||
"subtle",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"workspace_hack",
|
||||
"x509-parser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.31.0"
|
||||
@@ -4807,6 +5074,16 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rfc6979"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2"
|
||||
dependencies = [
|
||||
"hmac",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.16.20"
|
||||
@@ -4867,6 +5144,26 @@ dependencies = [
|
||||
"archery",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rsa"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
|
||||
dependencies = [
|
||||
"const-oid",
|
||||
"digest",
|
||||
"num-bigint-dig",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
"pkcs1",
|
||||
"pkcs8 0.10.2",
|
||||
"rand_core 0.6.4",
|
||||
"signature 2.2.0",
|
||||
"spki 0.7.3",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rstest"
|
||||
version = "0.18.2"
|
||||
@@ -5195,10 +5492,24 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928"
|
||||
dependencies = [
|
||||
"base16ct",
|
||||
"base16ct 0.1.1",
|
||||
"der 0.6.1",
|
||||
"generic-array",
|
||||
"pkcs8",
|
||||
"pkcs8 0.9.0",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sec1"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc"
|
||||
dependencies = [
|
||||
"base16ct 0.2.0",
|
||||
"der 0.7.8",
|
||||
"generic-array",
|
||||
"pkcs8 0.10.2",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -5545,6 +5856,7 @@ version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
|
||||
dependencies = [
|
||||
"digest",
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
@@ -7379,13 +7691,17 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"crypto-bigint 0.5.5",
|
||||
"der 0.7.8",
|
||||
"deranged",
|
||||
"digest",
|
||||
"either",
|
||||
"fail",
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-util",
|
||||
"generic-array",
|
||||
"getrandom 0.2.11",
|
||||
"hashbrown 0.14.5",
|
||||
"hex",
|
||||
@@ -7393,6 +7709,7 @@ dependencies = [
|
||||
"hyper 0.14.26",
|
||||
"indexmap 1.9.3",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"memchr",
|
||||
@@ -7416,7 +7733,9 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"signature 2.2.0",
|
||||
"smallvec",
|
||||
"spki 0.7.3",
|
||||
"subtle",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.52",
|
||||
@@ -7527,6 +7846,7 @@ version = "1.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"zeroize_derive",
|
||||
]
|
||||
|
||||
|
||||
@@ -9,7 +9,10 @@ members = [
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"proxy",
|
||||
"proxy/core",
|
||||
"proxy/sasl",
|
||||
"proxy/proxy",
|
||||
"proxy/pg_sni_router",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
"storage_controller",
|
||||
|
||||
@@ -22,7 +22,10 @@ feature-depth = 1
|
||||
[advisories]
|
||||
db-urls = ["https://github.com/rustsec/advisory-db"]
|
||||
yanked = "warn"
|
||||
ignore = []
|
||||
|
||||
[[advisories.ignore]]
|
||||
id = "RUSTSEC-2023-0071"
|
||||
reason = "the marvin attack only affects private key decryption, not public key signature verification"
|
||||
|
||||
# This section is considered when running `cargo deny check licenses`
|
||||
# More documentation for the licenses section can be found here:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[package]
|
||||
name = "proxy"
|
||||
name = "proxy-core"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
@@ -9,8 +9,11 @@ default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
proxy-sasl = { version = "0.1", path = "../sasl" }
|
||||
|
||||
ahash.workspace = true
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-trait.workspace = true
|
||||
atomic-take.workspace = true
|
||||
@@ -30,7 +33,6 @@ dashmap.workspace = true
|
||||
env_logger.workspace = true
|
||||
framed-websockets.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hashbrown.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
@@ -51,17 +53,15 @@ md5.workspace = true
|
||||
measured = { workspace = true, features = ["lasso"] }
|
||||
metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
parking_lot.workspace = true
|
||||
parquet.workspace = true
|
||||
parquet_derive.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
pq_proto.workspace = true
|
||||
prometheus.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
remote_storage = { version = "0.1", path = "../../libs/remote_storage/" }
|
||||
reqwest.workspace = true
|
||||
reqwest-middleware = { workspace = true, features = ["json"] }
|
||||
reqwest-retry.workspace = true
|
||||
@@ -73,14 +73,13 @@ rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sha2 = { workspace = true, features = ["asm"] }
|
||||
sha2 = { workspace = true, features = ["asm", "oid"] }
|
||||
smol_str.workspace = true
|
||||
smallvec.workspace = true
|
||||
socket2.workspace = true
|
||||
subtle.workspace = true
|
||||
task-local-extensions.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
@@ -103,6 +102,14 @@ x509-parser.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
redis.workspace = true
|
||||
|
||||
# jwt stuff
|
||||
jose-jwa = "0.1.2"
|
||||
jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
|
||||
signature = "2"
|
||||
ecdsa = "0.16"
|
||||
p256 = "0.13"
|
||||
rsa = "0.9"
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
@@ -38,7 +38,7 @@ pub enum AuthErrorImpl {
|
||||
|
||||
/// SASL protocol errors (includes [SCRAM](crate::scram)).
|
||||
#[error(transparent)]
|
||||
Sasl(#[from] crate::sasl::Error),
|
||||
Sasl(#[from] proxy_sasl::sasl::Error),
|
||||
|
||||
#[error("Unsupported authentication method: {0}")]
|
||||
BadAuthMethod(Box<str>),
|
||||
@@ -148,3 +148,28 @@ impl ReportableError for AuthError {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for proxy_sasl::sasl::Error {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
proxy_sasl::sasl::Error::ChannelBindingFailed(m) => m.to_string(),
|
||||
proxy_sasl::sasl::Error::ChannelBindingBadMethod(m) => {
|
||||
format!("unsupported channel binding method {m}")
|
||||
}
|
||||
_ => "authentication protocol violation".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for proxy_sasl::sasl::Error {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
proxy_sasl::sasl::Error::ChannelBindingFailed(_) => crate::error::ErrorKind::User,
|
||||
proxy_sasl::sasl::Error::ChannelBindingBadMethod(_) => crate::error::ErrorKind::User,
|
||||
proxy_sasl::sasl::Error::BadClientMessage(_) => crate::error::ErrorKind::User,
|
||||
proxy_sasl::sasl::Error::MissingBinding => crate::error::ErrorKind::Service,
|
||||
proxy_sasl::sasl::Error::Base64(_) => crate::error::ErrorKind::ControlPlane,
|
||||
proxy_sasl::sasl::Error::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
mod classic;
|
||||
mod hacks;
|
||||
pub mod jwt;
|
||||
mod link;
|
||||
|
||||
use std::net::IpAddr;
|
||||
@@ -8,6 +9,7 @@ use std::time::Duration;
|
||||
|
||||
use ipnet::{Ipv4Net, Ipv6Net};
|
||||
pub use link::LinkAuthError;
|
||||
use proxy_sasl::scram;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
use tracing::{info, warn};
|
||||
@@ -35,7 +37,7 @@ use crate::{
|
||||
},
|
||||
stream, url,
|
||||
};
|
||||
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
|
||||
use crate::{EndpointCacheKey, EndpointId, RoleName};
|
||||
|
||||
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
|
||||
pub enum MaybeOwned<'a, T> {
|
||||
@@ -370,8 +372,8 @@ async fn authenticate_with_secret(
|
||||
let auth_outcome =
|
||||
validate_password_and_exchange(&config.thread_pool, ep, &password, secret).await?;
|
||||
let keys = match auth_outcome {
|
||||
crate::sasl::Outcome::Success(key) => key,
|
||||
crate::sasl::Outcome::Failure(reason) => {
|
||||
proxy_sasl::sasl::Outcome::Success(key) => key,
|
||||
proxy_sasl::sasl::Outcome::Failure(reason) => {
|
||||
info!("auth backend failed with an error: {reason}");
|
||||
return Err(auth::AuthError::auth_failed(&*info.user));
|
||||
}
|
||||
@@ -557,9 +559,9 @@ mod tests {
|
||||
context::RequestMonitoring,
|
||||
proxy::NeonOptions,
|
||||
rate_limiter::{EndpointRateLimiter, RateBucketInfo},
|
||||
scram::{threadpool::ThreadPool, ServerSecret},
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use proxy_sasl::scram::{threadpool::ThreadPool, ServerSecret};
|
||||
|
||||
use super::{auth_quirks, AuthRateLimiter};
|
||||
|
||||
@@ -668,7 +670,11 @@ mod tests {
|
||||
let ctx = RequestMonitoring::test();
|
||||
let api = Auth {
|
||||
ips: vec![],
|
||||
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
|
||||
secret: AuthSecret::Scram(
|
||||
ServerSecret::build_test_secret("my-secret-password")
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
|
||||
let user_info = ComputeUserInfoMaybeEndpoint {
|
||||
@@ -745,7 +751,11 @@ mod tests {
|
||||
let ctx = RequestMonitoring::test();
|
||||
let api = Auth {
|
||||
ips: vec![],
|
||||
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
|
||||
secret: AuthSecret::Scram(
|
||||
ServerSecret::build_test_secret("my-secret-password")
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
|
||||
let user_info = ComputeUserInfoMaybeEndpoint {
|
||||
@@ -797,7 +807,11 @@ mod tests {
|
||||
let ctx = RequestMonitoring::test();
|
||||
let api = Auth {
|
||||
ips: vec![],
|
||||
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
|
||||
secret: AuthSecret::Scram(
|
||||
ServerSecret::build_test_secret("my-secret-password")
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
|
||||
let user_info = ComputeUserInfoMaybeEndpoint {
|
||||
@@ -5,9 +5,9 @@ use crate::{
|
||||
config::AuthenticationConfig,
|
||||
console::AuthSecret,
|
||||
context::RequestMonitoring,
|
||||
sasl,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use proxy_sasl::sasl;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -7,9 +7,9 @@ use crate::{
|
||||
console::AuthSecret,
|
||||
context::RequestMonitoring,
|
||||
intern::EndpointIdInt,
|
||||
sasl,
|
||||
stream::{self, Stream},
|
||||
};
|
||||
use proxy_sasl::sasl;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
|
||||
554
proxy/core/src/auth/backend/jwt.rs
Normal file
554
proxy/core/src/auth/backend/jwt.rs
Normal file
@@ -0,0 +1,554 @@
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use arc_swap::ArcSwapOption;
|
||||
use dashmap::DashMap;
|
||||
use jose_jwk::crypto::KeyInfo;
|
||||
use signature::Verifier;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::{http::parse_json_body_with_limit, intern::EndpointIdInt};
|
||||
|
||||
// TODO(conrad): make these configurable.
|
||||
const MIN_RENEW: Duration = Duration::from_secs(30);
|
||||
const AUTO_RENEW: Duration = Duration::from_secs(300);
|
||||
const MAX_RENEW: Duration = Duration::from_secs(3600);
|
||||
const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
|
||||
|
||||
/// How to get the JWT auth rules
|
||||
pub trait FetchAuthRules: Clone + Send + Sync + 'static {
|
||||
fn fetch_auth_rules(&self) -> impl Future<Output = anyhow::Result<AuthRules>> + Send;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FetchAuthRulesFromCplane {
|
||||
#[allow(dead_code)]
|
||||
endpoint: EndpointIdInt,
|
||||
}
|
||||
|
||||
impl FetchAuthRules for FetchAuthRulesFromCplane {
|
||||
async fn fetch_auth_rules(&self) -> anyhow::Result<AuthRules> {
|
||||
Err(anyhow::anyhow!("not yet implemented"))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AuthRules {
|
||||
jwks_urls: Vec<url::Url>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct JwkCache {
|
||||
client: reqwest::Client,
|
||||
|
||||
map: DashMap<EndpointIdInt, Arc<JwkCacheEntryLock>>,
|
||||
}
|
||||
|
||||
pub struct JwkCacheEntryLock {
|
||||
cached: ArcSwapOption<JwkCacheEntry>,
|
||||
lookup: tokio::sync::Semaphore,
|
||||
}
|
||||
|
||||
impl Default for JwkCacheEntryLock {
|
||||
fn default() -> Self {
|
||||
JwkCacheEntryLock {
|
||||
cached: ArcSwapOption::empty(),
|
||||
lookup: tokio::sync::Semaphore::new(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JwkCacheEntry {
|
||||
/// Should refetch at least every hour to verify when old keys have been removed.
|
||||
/// Should refetch when new key IDs are seen only every 5 minutes or so
|
||||
last_retrieved: Instant,
|
||||
|
||||
/// cplane will return multiple JWKs urls that we need to scrape.
|
||||
key_sets: ahash::HashMap<url::Url, jose_jwk::JwkSet>,
|
||||
}
|
||||
|
||||
impl JwkCacheEntryLock {
|
||||
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
|
||||
JwkRenewalPermit::acquire_permit(self).await
|
||||
}
|
||||
|
||||
fn try_acquire_permit<'a>(self: &'a Arc<Self>) -> Option<JwkRenewalPermit<'a>> {
|
||||
JwkRenewalPermit::try_acquire_permit(self)
|
||||
}
|
||||
|
||||
async fn renew_jwks<F: FetchAuthRules>(
|
||||
&self,
|
||||
_permit: JwkRenewalPermit<'_>,
|
||||
client: &reqwest::Client,
|
||||
auth_rules: &F,
|
||||
) -> anyhow::Result<Arc<JwkCacheEntry>> {
|
||||
// double check that no one beat us to updating the cache.
|
||||
let now = Instant::now();
|
||||
let guard = self.cached.load_full();
|
||||
if let Some(cached) = guard {
|
||||
let last_update = now.duration_since(cached.last_retrieved);
|
||||
if last_update < Duration::from_secs(300) {
|
||||
return Ok(cached);
|
||||
}
|
||||
}
|
||||
|
||||
let rules = auth_rules.fetch_auth_rules().await?;
|
||||
let mut key_sets = ahash::HashMap::with_capacity_and_hasher(
|
||||
rules.jwks_urls.len(),
|
||||
ahash::RandomState::new(),
|
||||
);
|
||||
// TODO(conrad): run concurrently
|
||||
for url in rules.jwks_urls {
|
||||
let req = client.get(url.clone());
|
||||
// TODO(conrad): eventually switch to using reqwest_middleware/`new_client_with_timeout`.
|
||||
match req.send().await.and_then(|r| r.error_for_status()) {
|
||||
// todo: should we re-insert JWKs if we want to keep this JWKs URL?
|
||||
// I expect these failures would be quite sparse.
|
||||
Err(e) => tracing::warn!(?url, error=?e, "could not fetch JWKs"),
|
||||
Ok(r) => {
|
||||
let resp: http::Response<reqwest::Body> = r.into();
|
||||
match parse_json_body_with_limit::<jose_jwk::JwkSet>(
|
||||
resp.into_body(),
|
||||
MAX_JWK_BODY_SIZE,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => tracing::warn!(?url, error=?e, "could not decode JWKs"),
|
||||
Ok(jwks) => {
|
||||
key_sets.insert(url, jwks);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let entry = Arc::new(JwkCacheEntry {
|
||||
last_retrieved: now,
|
||||
key_sets,
|
||||
});
|
||||
self.cached.swap(Some(Arc::clone(&entry)));
|
||||
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
async fn get_or_update_jwk_cache<F: FetchAuthRules>(
|
||||
self: &Arc<Self>,
|
||||
client: &reqwest::Client,
|
||||
fetch: &F,
|
||||
) -> Result<Arc<JwkCacheEntry>, anyhow::Error> {
|
||||
let now = Instant::now();
|
||||
let guard = self.cached.load_full();
|
||||
|
||||
// if we have no cached JWKs, try and get some
|
||||
let Some(cached) = guard else {
|
||||
let permit = self.acquire_permit().await;
|
||||
return self.renew_jwks(permit, client, fetch).await;
|
||||
};
|
||||
|
||||
let last_update = now.duration_since(cached.last_retrieved);
|
||||
|
||||
// check if the cached JWKs need updating.
|
||||
if last_update > MAX_RENEW {
|
||||
let permit = self.acquire_permit().await;
|
||||
|
||||
// it's been too long since we checked the keys. wait for them to update.
|
||||
return self.renew_jwks(permit, client, fetch).await;
|
||||
}
|
||||
|
||||
// every 5 minutes we should spawn a job to eagerly update the token.
|
||||
if last_update > AUTO_RENEW {
|
||||
if let Some(permit) = self.try_acquire_permit() {
|
||||
tracing::debug!("JWKs should be renewed. Renewal permit acquired");
|
||||
let permit = permit.into_owned();
|
||||
let entry = self.clone();
|
||||
let client = client.clone();
|
||||
let fetch = fetch.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = entry.renew_jwks(permit, &client, &fetch).await {
|
||||
tracing::warn!(error=?e, "could not fetch JWKs in background job");
|
||||
}
|
||||
});
|
||||
} else {
|
||||
tracing::debug!("JWKs should be renewed. Renewal permit already taken, skipping");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(cached)
|
||||
}
|
||||
|
||||
async fn check_jwt<F: FetchAuthRules>(
|
||||
self: &Arc<Self>,
|
||||
jwt: String,
|
||||
client: &reqwest::Client,
|
||||
fetch: &F,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// JWT compact form is defined to be
|
||||
// <B64(Header)> || . || <B64(Payload)> || . || <B64(Signature)>
|
||||
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
|
||||
|
||||
let (header_payload, signature) = jwt
|
||||
.rsplit_once(".")
|
||||
.context("not a valid compact JWT encoding")?;
|
||||
let (header, _payload) = header_payload
|
||||
.split_once(".")
|
||||
.context("not a valid compact JWT encoding")?;
|
||||
|
||||
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)
|
||||
.context("not a valid compact JWT encoding")?;
|
||||
let header = serde_json::from_slice::<JWTHeader>(&header)
|
||||
.context("not a valid compact JWT encoding")?;
|
||||
|
||||
ensure!(header.typ == "JWT");
|
||||
let kid = header.kid.context("missing key id")?;
|
||||
|
||||
let mut guard = self.get_or_update_jwk_cache(client, fetch).await?;
|
||||
|
||||
// get the key from the JWKs if possible. If not, wait for the keys to update.
|
||||
let jwk = loop {
|
||||
let jwk = guard
|
||||
.key_sets
|
||||
.values()
|
||||
.flat_map(|jwks| &jwks.keys)
|
||||
.find(|jwk| jwk.prm.kid.as_deref() == Some(kid));
|
||||
|
||||
match jwk {
|
||||
Some(jwk) => break jwk,
|
||||
None if guard.last_retrieved.elapsed() > MIN_RENEW => {
|
||||
let permit = self.acquire_permit().await;
|
||||
guard = self.renew_jwks(permit, client, fetch).await?;
|
||||
}
|
||||
_ => {
|
||||
bail!("jwk not found");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ensure!(
|
||||
jwk.is_supported(&header.alg),
|
||||
"signature algorithm not supported"
|
||||
);
|
||||
|
||||
let sig = base64::decode_config(signature, base64::URL_SAFE_NO_PAD)
|
||||
.context("not a valid compact JWT encoding")?;
|
||||
match &jwk.key {
|
||||
jose_jwk::Key::Ec(key) => {
|
||||
verify_ec_signature(header_payload.as_bytes(), &sig, key)?;
|
||||
}
|
||||
jose_jwk::Key::Rsa(key) => {
|
||||
verify_rsa_signature(header_payload.as_bytes(), &sig, key, &jwk.prm.alg)?;
|
||||
}
|
||||
key => bail!("unsupported key type {key:?}"),
|
||||
};
|
||||
|
||||
// TODO(conrad): verify iss, exp, nbf, etc...
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl JwkCache {
|
||||
pub async fn check_jwt(
|
||||
&self,
|
||||
endpoint: EndpointIdInt,
|
||||
jwt: String,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// try with just a read lock first
|
||||
let entry = self.map.get(&endpoint).as_deref().map(Arc::clone);
|
||||
let entry = match entry {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
// acquire a write lock after to insert.
|
||||
let entry = self.map.entry(endpoint).or_default();
|
||||
Arc::clone(&*entry)
|
||||
}
|
||||
};
|
||||
|
||||
let fetch = FetchAuthRulesFromCplane { endpoint };
|
||||
entry.check_jwt(jwt, &self.client, &fetch).await
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_ec_signature(data: &[u8], sig: &[u8], key: &jose_jwk::Ec) -> anyhow::Result<()> {
|
||||
use ecdsa::Signature;
|
||||
use signature::Verifier;
|
||||
|
||||
match key.crv {
|
||||
jose_jwk::EcCurves::P256 => {
|
||||
let pk =
|
||||
p256::PublicKey::try_from(key).map_err(|_| anyhow::anyhow!("invalid P256 key"))?;
|
||||
let key = p256::ecdsa::VerifyingKey::from(&pk);
|
||||
let sig = Signature::from_slice(sig)?;
|
||||
key.verify(data, &sig)?;
|
||||
}
|
||||
key => bail!("unsupported ec key type {key:?}"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn verify_rsa_signature(
|
||||
data: &[u8],
|
||||
sig: &[u8],
|
||||
key: &jose_jwk::Rsa,
|
||||
alg: &Option<jose_jwa::Algorithm>,
|
||||
) -> anyhow::Result<()> {
|
||||
use jose_jwa::{Algorithm, Signing};
|
||||
use rsa::{
|
||||
pkcs1v15::{Signature, VerifyingKey},
|
||||
RsaPublicKey,
|
||||
};
|
||||
|
||||
let key = RsaPublicKey::try_from(key).map_err(|_| anyhow::anyhow!("invalid RSA key"))?;
|
||||
|
||||
match alg {
|
||||
Some(Algorithm::Signing(Signing::Rs256)) => {
|
||||
let key = VerifyingKey::<sha2::Sha256>::new(key);
|
||||
let sig = Signature::try_from(sig)?;
|
||||
key.verify(data, &sig)?;
|
||||
}
|
||||
_ => bail!("invalid RSA signing algorithm"),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// <https://datatracker.ietf.org/doc/html/rfc7515#section-4.1>
|
||||
#[derive(serde::Deserialize, serde::Serialize)]
|
||||
struct JWTHeader<'a> {
|
||||
/// must be "JWT"
|
||||
typ: &'a str,
|
||||
/// must be a supported alg
|
||||
alg: jose_jwa::Algorithm,
|
||||
/// key id, must be provided for our usecase
|
||||
kid: Option<&'a str>,
|
||||
}
|
||||
|
||||
struct JwkRenewalPermit<'a> {
|
||||
inner: Option<JwkRenewalPermitInner<'a>>,
|
||||
}
|
||||
|
||||
enum JwkRenewalPermitInner<'a> {
|
||||
Owned(Arc<JwkCacheEntryLock>),
|
||||
Borrowed(&'a Arc<JwkCacheEntryLock>),
|
||||
}
|
||||
|
||||
impl JwkRenewalPermit<'_> {
|
||||
fn into_owned(mut self) -> JwkRenewalPermit<'static> {
|
||||
JwkRenewalPermit {
|
||||
inner: self.inner.take().map(JwkRenewalPermitInner::into_owned),
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire_permit(from: &Arc<JwkCacheEntryLock>) -> JwkRenewalPermit {
|
||||
match from.lookup.acquire().await {
|
||||
Ok(permit) => {
|
||||
permit.forget();
|
||||
JwkRenewalPermit {
|
||||
inner: Some(JwkRenewalPermitInner::Borrowed(from)),
|
||||
}
|
||||
}
|
||||
Err(_) => panic!("semaphore should not be closed"),
|
||||
}
|
||||
}
|
||||
|
||||
fn try_acquire_permit(from: &Arc<JwkCacheEntryLock>) -> Option<JwkRenewalPermit> {
|
||||
match from.lookup.try_acquire() {
|
||||
Ok(permit) => {
|
||||
permit.forget();
|
||||
Some(JwkRenewalPermit {
|
||||
inner: Some(JwkRenewalPermitInner::Borrowed(from)),
|
||||
})
|
||||
}
|
||||
Err(tokio::sync::TryAcquireError::NoPermits) => None,
|
||||
Err(tokio::sync::TryAcquireError::Closed) => panic!("semaphore should not be closed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl JwkRenewalPermitInner<'_> {
|
||||
fn into_owned(self) -> JwkRenewalPermitInner<'static> {
|
||||
match self {
|
||||
JwkRenewalPermitInner::Owned(p) => JwkRenewalPermitInner::Owned(p),
|
||||
JwkRenewalPermitInner::Borrowed(p) => JwkRenewalPermitInner::Owned(Arc::clone(p)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for JwkRenewalPermit<'_> {
|
||||
fn drop(&mut self) {
|
||||
let entry = match &self.inner {
|
||||
None => return,
|
||||
Some(JwkRenewalPermitInner::Owned(p)) => p,
|
||||
Some(JwkRenewalPermitInner::Borrowed(p)) => *p,
|
||||
};
|
||||
entry.lookup.add_permits(1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use std::{future::IntoFuture, net::SocketAddr, time::SystemTime};
|
||||
|
||||
use base64::URL_SAFE_NO_PAD;
|
||||
use bytes::Bytes;
|
||||
use http::Response;
|
||||
use http_body_util::Full;
|
||||
use hyper1::service::service_fn;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use rand::rngs::OsRng;
|
||||
use signature::Signer;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
fn new_ec_jwk(kid: String) -> (p256::SecretKey, jose_jwk::Jwk) {
|
||||
let sk = p256::SecretKey::random(&mut OsRng);
|
||||
let pk = sk.public_key().into();
|
||||
let jwk = jose_jwk::Jwk {
|
||||
key: jose_jwk::Key::Ec(pk),
|
||||
prm: jose_jwk::Parameters {
|
||||
kid: Some(kid),
|
||||
alg: Some(jose_jwa::Algorithm::Signing(jose_jwa::Signing::Es256)),
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
(sk, jwk)
|
||||
}
|
||||
|
||||
fn new_rsa_jwk(kid: String) -> (rsa::RsaPrivateKey, jose_jwk::Jwk) {
|
||||
let sk = rsa::RsaPrivateKey::new(&mut OsRng, 2048).unwrap();
|
||||
let pk = sk.to_public_key().into();
|
||||
let jwk = jose_jwk::Jwk {
|
||||
key: jose_jwk::Key::Rsa(pk),
|
||||
prm: jose_jwk::Parameters {
|
||||
kid: Some(kid),
|
||||
alg: Some(jose_jwa::Algorithm::Signing(jose_jwa::Signing::Rs256)),
|
||||
..Default::default()
|
||||
},
|
||||
};
|
||||
(sk, jwk)
|
||||
}
|
||||
|
||||
fn build_jwt_payload(kid: String, sig: jose_jwa::Signing) -> String {
|
||||
let header = JWTHeader {
|
||||
typ: "JWT",
|
||||
alg: jose_jwa::Algorithm::Signing(sig),
|
||||
kid: Some(&kid),
|
||||
};
|
||||
let body = typed_json::json! {{
|
||||
"exp": SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() + 3600,
|
||||
}};
|
||||
|
||||
let header =
|
||||
base64::encode_config(serde_json::to_string(&header).unwrap(), URL_SAFE_NO_PAD);
|
||||
let body = base64::encode_config(body.to_string(), URL_SAFE_NO_PAD);
|
||||
|
||||
format!("{header}.{body}")
|
||||
}
|
||||
|
||||
fn new_ec_jwt(kid: String, key: p256::SecretKey) -> String {
|
||||
use p256::ecdsa::{Signature, SigningKey};
|
||||
|
||||
let payload = build_jwt_payload(kid, jose_jwa::Signing::Es256);
|
||||
let sig: Signature = SigningKey::from(key).sign(payload.as_bytes());
|
||||
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
|
||||
|
||||
format!("{payload}.{sig}")
|
||||
}
|
||||
|
||||
fn new_rsa_jwt(kid: String, key: rsa::RsaPrivateKey) -> String {
|
||||
use rsa::pkcs1v15::SigningKey;
|
||||
use rsa::signature::SignatureEncoding;
|
||||
|
||||
let payload = build_jwt_payload(kid, jose_jwa::Signing::Rs256);
|
||||
let sig = SigningKey::<sha2::Sha256>::new(key).sign(payload.as_bytes());
|
||||
let sig = base64::encode_config(sig.to_bytes(), URL_SAFE_NO_PAD);
|
||||
|
||||
format!("{payload}.{sig}")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn renew() {
|
||||
let (rs1, jwk1) = new_rsa_jwk("1".into());
|
||||
let (rs2, jwk2) = new_rsa_jwk("2".into());
|
||||
let (ec1, jwk3) = new_ec_jwk("3".into());
|
||||
let (ec2, jwk4) = new_ec_jwk("4".into());
|
||||
|
||||
let jwt1 = new_rsa_jwt("1".into(), rs1);
|
||||
let jwt2 = new_rsa_jwt("2".into(), rs2);
|
||||
let jwt3 = new_ec_jwt("3".into(), ec1);
|
||||
let jwt4 = new_ec_jwt("4".into(), ec2);
|
||||
|
||||
let foo_jwks = jose_jwk::JwkSet {
|
||||
keys: vec![jwk1, jwk3],
|
||||
};
|
||||
let bar_jwks = jose_jwk::JwkSet {
|
||||
keys: vec![jwk2, jwk4],
|
||||
};
|
||||
|
||||
let service = service_fn(move |req| {
|
||||
let foo_jwks = foo_jwks.clone();
|
||||
let bar_jwks = bar_jwks.clone();
|
||||
async move {
|
||||
let jwks = match req.uri().path() {
|
||||
"/foo" => &foo_jwks,
|
||||
"/bar" => &bar_jwks,
|
||||
_ => {
|
||||
return Response::builder()
|
||||
.status(404)
|
||||
.body(Full::new(Bytes::new()));
|
||||
}
|
||||
};
|
||||
let body = serde_json::to_vec(jwks).unwrap();
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.body(Full::new(Bytes::from(body)))
|
||||
}
|
||||
});
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
|
||||
let server = hyper1::server::conn::http1::Builder::new();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (s, _) = listener.accept().await.unwrap();
|
||||
let serve = server.serve_connection(TokioIo::new(s), service.clone());
|
||||
tokio::spawn(serve.into_future());
|
||||
}
|
||||
});
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Fetch(SocketAddr);
|
||||
|
||||
impl FetchAuthRules for Fetch {
|
||||
async fn fetch_auth_rules(&self) -> anyhow::Result<AuthRules> {
|
||||
Ok(AuthRules {
|
||||
jwks_urls: vec![
|
||||
format!("http://{}/foo", self.0).parse().unwrap(),
|
||||
format!("http://{}/bar", self.0).parse().unwrap(),
|
||||
],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let jwk_cache = Arc::new(JwkCacheEntryLock::default());
|
||||
|
||||
jwk_cache
|
||||
.check_jwt(jwt1, &client, &Fetch(addr))
|
||||
.await
|
||||
.unwrap();
|
||||
jwk_cache
|
||||
.check_jwt(jwt2, &client, &Fetch(addr))
|
||||
.await
|
||||
.unwrap();
|
||||
jwk_cache
|
||||
.check_jwt(jwt3, &client, &Fetch(addr))
|
||||
.await
|
||||
.unwrap();
|
||||
jwk_cache
|
||||
.check_jwt(jwt4, &client, &Fetch(addr))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
@@ -2,16 +2,17 @@
|
||||
|
||||
use super::{backend::ComputeCredentialKeys, AuthErrorImpl, PasswordHackPayload};
|
||||
use crate::{
|
||||
config::TlsServerEndPoint,
|
||||
console::AuthSecret,
|
||||
context::RequestMonitoring,
|
||||
intern::EndpointIdInt,
|
||||
sasl,
|
||||
scram::{self, threadpool::ThreadPool},
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use postgres_protocol::authentication::sasl::{SCRAM_SHA_256, SCRAM_SHA_256_PLUS};
|
||||
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
|
||||
use proxy_sasl::{
|
||||
sasl,
|
||||
scram::{self, threadpool::ThreadPool, TlsServerEndPoint},
|
||||
};
|
||||
use std::{io, sync::Arc};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
@@ -56,7 +57,7 @@ impl AuthMethod for PasswordHack {
|
||||
/// Use clear-text password auth called `password` in docs
|
||||
/// <https://www.postgresql.org/docs/current/auth-password.html>
|
||||
pub struct CleartextPassword {
|
||||
pub pool: Arc<ThreadPool>,
|
||||
pub pool: Arc<ThreadPool<EndpointIdInt>>,
|
||||
pub endpoint: EndpointIdInt,
|
||||
pub secret: AuthSecret,
|
||||
}
|
||||
@@ -174,7 +175,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
}
|
||||
info!("client chooses {}", sasl.method);
|
||||
|
||||
let outcome = sasl::SaslStream::new(self.stream, sasl.message)
|
||||
let outcome = sasl::SaslStream::new(&mut self.stream.framed, sasl.message)
|
||||
.authenticate(scram::Exchange::new(
|
||||
secret,
|
||||
rand::random,
|
||||
@@ -191,7 +192,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
}
|
||||
|
||||
pub(crate) async fn validate_password_and_exchange(
|
||||
pool: &ThreadPool,
|
||||
pool: &ThreadPool<EndpointIdInt>,
|
||||
endpoint: EndpointIdInt,
|
||||
password: &[u8],
|
||||
secret: AuthSecret,
|
||||
@@ -206,7 +207,8 @@ pub(crate) async fn validate_password_and_exchange(
|
||||
}
|
||||
// perform scram authentication as both client and server to validate the keys
|
||||
AuthSecret::Scram(scram_secret) => {
|
||||
let outcome = crate::scram::exchange(pool, endpoint, &scram_secret, password).await?;
|
||||
let outcome =
|
||||
proxy_sasl::scram::exchange(pool, endpoint, &scram_secret, password).await?;
|
||||
|
||||
let client_key = match outcome {
|
||||
sasl::Outcome::Success(client_key) => client_key,
|
||||
@@ -371,7 +371,8 @@ impl Cache for ProjectInfoCacheImpl {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{scram::ServerSecret, ProjectId};
|
||||
use crate::ProjectId;
|
||||
use proxy_sasl::scram::ServerSecret;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_project_info_cache_settings() {
|
||||
@@ -1,27 +1,26 @@
|
||||
use crate::{
|
||||
auth::{self, backend::AuthRateLimiter},
|
||||
console::locks::ApiLocks,
|
||||
intern::EndpointIdInt,
|
||||
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
|
||||
scram::threadpool::ThreadPool,
|
||||
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
|
||||
Host,
|
||||
};
|
||||
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use itertools::Itertools;
|
||||
use proxy_sasl::scram::{threadpool::ThreadPool, TlsServerEndPoint};
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use rustls::{
|
||||
crypto::ring::sign,
|
||||
pki_types::{CertificateDer, PrivateKeyDer},
|
||||
};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
use x509_parser::oid_registry;
|
||||
|
||||
pub struct ProxyConfig {
|
||||
pub tls_config: Option<TlsConfig>,
|
||||
@@ -58,7 +57,7 @@ pub struct HttpConfig {
|
||||
}
|
||||
|
||||
pub struct AuthenticationConfig {
|
||||
pub thread_pool: Arc<ThreadPool>,
|
||||
pub thread_pool: Arc<ThreadPool<EndpointIdInt>>,
|
||||
pub scram_protocol_timeout: tokio::time::Duration,
|
||||
pub rate_limiter_enabled: bool,
|
||||
pub rate_limiter: AuthRateLimiter,
|
||||
@@ -126,66 +125,6 @@ pub fn configure_tls(
|
||||
})
|
||||
}
|
||||
|
||||
/// Channel binding parameter
|
||||
///
|
||||
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
|
||||
/// Description: The hash of the TLS server's certificate as it
|
||||
/// appears, octet for octet, in the server's Certificate message. Note
|
||||
/// that the Certificate message contains a certificate_list, in which
|
||||
/// the first element is the server's certificate.
|
||||
///
|
||||
/// The hash function is to be selected as follows:
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function and that hash function neither MD5 nor SHA-1, then use
|
||||
/// the hash function associated with the certificate's
|
||||
/// signatureAlgorithm;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses no hash functions or
|
||||
/// uses multiple hash functions, then this channel binding type's
|
||||
/// channel bindings are undefined at this time (updates to is channel
|
||||
/// binding type may occur to address this issue if it ever arises).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum TlsServerEndPoint {
|
||||
Sha256([u8; 32]),
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl TlsServerEndPoint {
|
||||
pub fn new(cert: &CertificateDer) -> anyhow::Result<Self> {
|
||||
let sha256_oids = [
|
||||
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
|
||||
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
|
||||
oid_registry::OID_PKCS1_SHA256WITHRSA,
|
||||
];
|
||||
|
||||
let pem = x509_parser::parse_x509_certificate(cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
info!(subject = %pem.subject, "parsing TLS certificate");
|
||||
|
||||
let reg = oid_registry::OidRegistry::default().with_all_crypto();
|
||||
let oid = pem.signature_algorithm.oid();
|
||||
let alg = reg.get(oid);
|
||||
if sha256_oids.contains(oid) {
|
||||
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
|
||||
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
|
||||
Ok(Self::Sha256(tls_server_end_point))
|
||||
} else {
|
||||
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
|
||||
Ok(Self::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn supported(&self) -> bool {
|
||||
!matches!(self, TlsServerEndPoint::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
@@ -16,9 +16,10 @@ use crate::{
|
||||
intern::ProjectIdInt,
|
||||
metrics::ApiLockMetrics,
|
||||
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
|
||||
scram, EndpointCacheKey,
|
||||
EndpointCacheKey,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use proxy_sasl::scram;
|
||||
use std::{hash::Hash, sync::Arc, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
@@ -469,15 +470,15 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
) -> prometheus::Result<Self> {
|
||||
Ok(Self {
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
config,
|
||||
timeout,
|
||||
epoch,
|
||||
metrics,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
|
||||
@@ -5,7 +5,7 @@ use super::{
|
||||
AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo,
|
||||
};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, url::ApiUrl};
|
||||
use crate::{auth::IpPattern, cache::Cached};
|
||||
use crate::{
|
||||
console::{
|
||||
@@ -15,6 +15,7 @@ use crate::{
|
||||
BranchId, EndpointId, ProjectId,
|
||||
};
|
||||
use futures::TryFutureExt;
|
||||
use proxy_sasl::scram;
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
use thiserror::Error;
|
||||
use tokio_postgres::{config::SslMode, Client};
|
||||
@@ -13,10 +13,11 @@ use crate::{
|
||||
http,
|
||||
metrics::{CacheOutcome, Metrics},
|
||||
rate_limiter::WakeComputeRateLimiter,
|
||||
scram, EndpointCacheKey,
|
||||
EndpointCacheKey,
|
||||
};
|
||||
use crate::{cache::Cached, context::RequestMonitoring};
|
||||
use futures::TryFutureExt;
|
||||
use proxy_sasl::scram;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
@@ -6,6 +6,12 @@ pub mod health_server;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use bytes::Bytes;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper1::body::Body;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
pub use reqwest::{Request, Response, StatusCode};
|
||||
pub use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
@@ -96,6 +102,33 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn parse_json_body_with_limit<D: DeserializeOwned>(
|
||||
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<D> {
|
||||
// We could use `b.limited().collect().await.to_bytes()` here
|
||||
// but this ends up being slightly more efficient as far as I can tell.
|
||||
|
||||
// check the lower bound of the size hint.
|
||||
// in reqwest, this value is influenced by the Content-Length header.
|
||||
let lower_bound = match usize::try_from(b.size_hint().lower()) {
|
||||
Ok(bound) if bound <= limit => bound,
|
||||
_ => bail!("content length exceeds limit"),
|
||||
};
|
||||
let mut bytes = Vec::with_capacity(lower_bound);
|
||||
|
||||
while let Some(frame) = b.frame().await.transpose()? {
|
||||
if let Ok(data) = frame.into_data() {
|
||||
if bytes.len() + data.len() > limit {
|
||||
bail!("content length exceeds limit")
|
||||
}
|
||||
bytes.extend_from_slice(&data);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(serde_json::from_slice::<D>(&bytes)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -21,13 +21,13 @@ pub mod intern;
|
||||
pub mod jemalloc;
|
||||
pub mod logging;
|
||||
pub mod metrics;
|
||||
pub mod parse;
|
||||
// pub mod parse;
|
||||
pub mod protocol2;
|
||||
pub mod proxy;
|
||||
pub mod rate_limiter;
|
||||
pub mod redis;
|
||||
pub mod sasl;
|
||||
pub mod scram;
|
||||
// pub mod sasl;
|
||||
// pub mod scram;
|
||||
pub mod serverless;
|
||||
pub mod stream;
|
||||
pub mod url;
|
||||
@@ -2,13 +2,14 @@ use std::sync::{Arc, OnceLock};
|
||||
|
||||
use lasso::ThreadedRodeo;
|
||||
use measured::{
|
||||
label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet},
|
||||
label::StaticLabelSet,
|
||||
metric::{histogram::Thresholds, name::MetricName},
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
|
||||
LabelGroup, MetricGroup,
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
|
||||
MetricGroup,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
|
||||
|
||||
use proxy_sasl::scram::threadpool::ThreadPoolMetrics;
|
||||
use tokio::time::{self, Instant};
|
||||
|
||||
use crate::console::messages::ColdStartInfo;
|
||||
@@ -546,78 +547,3 @@ pub enum RedisEventsCount {
|
||||
PasswordUpdate,
|
||||
AllowedIpsUpdate,
|
||||
}
|
||||
|
||||
pub struct ThreadPoolWorkers(usize);
|
||||
pub struct ThreadPoolWorkerId(pub usize);
|
||||
|
||||
impl LabelValue for ThreadPoolWorkerId {
|
||||
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
|
||||
v.write_int(self.0 as i64)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelGroup for ThreadPoolWorkerId {
|
||||
fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
|
||||
v.write_value(LabelName::from_str("worker"), self);
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelGroupSet for ThreadPoolWorkers {
|
||||
type Group<'a> = ThreadPoolWorkerId;
|
||||
|
||||
fn cardinality(&self) -> Option<usize> {
|
||||
Some(self.0)
|
||||
}
|
||||
|
||||
fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
|
||||
Some(value)
|
||||
}
|
||||
|
||||
fn decode_dense(&self, value: usize) -> Self::Group<'_> {
|
||||
ThreadPoolWorkerId(value)
|
||||
}
|
||||
|
||||
type Unique = usize;
|
||||
|
||||
fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
|
||||
Some(value.0)
|
||||
}
|
||||
|
||||
fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
|
||||
ThreadPoolWorkerId(*value)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelSet for ThreadPoolWorkers {
|
||||
type Value<'a> = ThreadPoolWorkerId;
|
||||
|
||||
fn dynamic_cardinality(&self) -> Option<usize> {
|
||||
Some(self.0)
|
||||
}
|
||||
|
||||
fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
|
||||
(value.0 < self.0).then_some(value.0)
|
||||
}
|
||||
|
||||
fn decode(&self, value: usize) -> Self::Value<'_> {
|
||||
ThreadPoolWorkerId(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedCardinalitySet for ThreadPoolWorkers {
|
||||
fn cardinality(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(workers: usize))]
|
||||
pub struct ThreadPoolMetrics {
|
||||
pub injector_queue_depth: Gauge,
|
||||
#[metric(init = GaugeVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_queue_depth: GaugeVec<ThreadPoolWorkers>,
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
|
||||
}
|
||||
@@ -16,9 +16,10 @@ use crate::console::messages::{ConsoleError, Details, MetricsAuxInfo, Status};
|
||||
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId};
|
||||
use crate::{http, BranchId, EndpointId, ProjectId};
|
||||
use anyhow::{bail, Context};
|
||||
use async_trait::async_trait;
|
||||
use proxy_sasl::{sasl, scram};
|
||||
use retry::{retry_after, ShouldRetryWakeCompute};
|
||||
use rstest::rstest;
|
||||
use rustls::pki_types;
|
||||
@@ -137,7 +138,7 @@ struct Scram(scram::ServerSecret);
|
||||
|
||||
impl Scram {
|
||||
async fn new(password: &str) -> anyhow::Result<Self> {
|
||||
let secret = scram::ServerSecret::build(password)
|
||||
let secret = scram::ServerSecret::build_test_secret(password)
|
||||
.await
|
||||
.context("failed to generate scram secret")?;
|
||||
Ok(Scram(secret))
|
||||
@@ -79,11 +79,11 @@ impl PoolingBackend {
|
||||
)
|
||||
.await?;
|
||||
let res = match auth_outcome {
|
||||
crate::sasl::Outcome::Success(key) => {
|
||||
proxy_sasl::sasl::Outcome::Success(key) => {
|
||||
info!("user successfully authenticated");
|
||||
Ok(key)
|
||||
}
|
||||
crate::sasl::Outcome::Failure(reason) => {
|
||||
proxy_sasl::sasl::Outcome::Failure(reason) => {
|
||||
info!("auth backend failed with an error: {reason}");
|
||||
Err(AuthError::auth_failed(&*conn_info.user_info.user))
|
||||
}
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::config::TlsServerEndPoint;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::metrics::Metrics;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use pq_proto::framed::{ConnectionError, Framed};
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
|
||||
use proxy_sasl::scram::TlsServerEndPoint;
|
||||
use rustls::ServerConfig;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
29
proxy/pg_sni_router/Cargo.toml
Normal file
29
proxy/pg_sni_router/Cargo.toml
Normal file
@@ -0,0 +1,29 @@
|
||||
[package]
|
||||
name = "pg_sni_router"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
proxy-sasl = { version = "0.1", path = "../sasl" }
|
||||
proxy-core = { version = "0.1", path = "../core" }
|
||||
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
itertools.workspace = true
|
||||
pq_proto.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
rustls.workspace = true
|
||||
socket2.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
utils.workspace = true
|
||||
uuid.workspace = true
|
||||
@@ -7,17 +7,18 @@ use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use futures::future::Either;
|
||||
use itertools::Itertools;
|
||||
use proxy::config::TlsServerEndPoint;
|
||||
use proxy::context::RequestMonitoring;
|
||||
use proxy::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
|
||||
use proxy_core::context::RequestMonitoring;
|
||||
use proxy_core::metrics::Metrics;
|
||||
use proxy_core::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
|
||||
use proxy_sasl::scram::threadpool::ThreadPoolMetrics;
|
||||
use proxy_sasl::scram::TlsServerEndPoint;
|
||||
use rustls::pki_types::PrivateKeyDer;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use clap::Arg;
|
||||
use futures::TryFutureExt;
|
||||
use proxy::stream::{PqStream, Stream};
|
||||
use proxy_core::stream::{PqStream, Stream};
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -62,7 +63,7 @@ fn cli() -> clap::Command {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _logging_guard = proxy_core::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
@@ -133,14 +134,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
let signals_task = tokio::spawn(proxy::handle_signals(cancellation_token));
|
||||
let signals_task = tokio::spawn(proxy_core::handle_signals(cancellation_token));
|
||||
|
||||
// the signal task cant ever succeed.
|
||||
// the main task can error, or can succeed on cancellation.
|
||||
// we want to immediately exit on either of these cases
|
||||
let signal = match futures::future::select(signals_task, main).await {
|
||||
Either::Left((res, _)) => proxy::flatten_err(res)?,
|
||||
Either::Right((res, _)) => return proxy::flatten_err(res),
|
||||
Either::Left((res, _)) => proxy_core::flatten_err(res)?,
|
||||
Either::Right((res, _)) => return proxy_core::flatten_err(res),
|
||||
};
|
||||
|
||||
// maintenance tasks return `Infallible` success values, this is an impossible value
|
||||
@@ -180,7 +181,7 @@ async fn task_main(
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
peer_addr.ip(),
|
||||
proxy::metrics::Protocol::SniRouter,
|
||||
proxy_core::metrics::Protocol::SniRouter,
|
||||
"sni",
|
||||
);
|
||||
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
|
||||
@@ -249,7 +250,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
"unexpected startup packet, rejecting connection"
|
||||
);
|
||||
stream
|
||||
.throw_error_str(ERR_INSECURE_CONNECTION, proxy::error::ErrorKind::User)
|
||||
.throw_error_str(ERR_INSECURE_CONNECTION, proxy_core::error::ErrorKind::User)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
34
proxy/proxy/Cargo.toml
Normal file
34
proxy/proxy/Cargo.toml
Normal file
@@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
proxy-sasl = { version = "0.1", path = "../sasl" }
|
||||
proxy-core = { version = "0.1", path = "../core" }
|
||||
|
||||
anyhow.workspace = true
|
||||
aws-config.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
metrics.workspace = true
|
||||
pq_proto.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../../libs/remote_storage/" }
|
||||
rustls-pemfile.workspace = true
|
||||
rustls.workspace = true
|
||||
socket2.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
utils.workspace = true
|
||||
uuid.workspace = true
|
||||
@@ -7,37 +7,37 @@ use aws_config::provider_config::ProviderConfig;
|
||||
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
|
||||
use aws_config::Region;
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::auth::backend::AuthRateLimiter;
|
||||
use proxy::auth::backend::MaybeOwned;
|
||||
use proxy::cancellation::CancelMap;
|
||||
use proxy::cancellation::CancellationHandler;
|
||||
use proxy::config::remote_storage_from_toml;
|
||||
use proxy::config::AuthenticationConfig;
|
||||
use proxy::config::CacheOptions;
|
||||
use proxy::config::HttpConfig;
|
||||
use proxy::config::ProjectInfoCacheOptions;
|
||||
use proxy::console;
|
||||
use proxy::context::parquet::ParquetUploadArgs;
|
||||
use proxy::http;
|
||||
use proxy::http::health_server::AppMetrics;
|
||||
use proxy::metrics::Metrics;
|
||||
use proxy::rate_limiter::EndpointRateLimiter;
|
||||
use proxy::rate_limiter::LeakyBucketConfig;
|
||||
use proxy::rate_limiter::RateBucketInfo;
|
||||
use proxy::rate_limiter::WakeComputeRateLimiter;
|
||||
use proxy::redis::cancellation_publisher::RedisPublisherClient;
|
||||
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use proxy::redis::elasticache;
|
||||
use proxy::redis::notifications;
|
||||
use proxy::scram::threadpool::ThreadPool;
|
||||
use proxy::serverless::cancel_set::CancelSet;
|
||||
use proxy::serverless::GlobalConnPoolOptions;
|
||||
use proxy::usage_metrics;
|
||||
use proxy_core::auth;
|
||||
use proxy_core::auth::backend::AuthRateLimiter;
|
||||
use proxy_core::auth::backend::MaybeOwned;
|
||||
use proxy_core::cancellation::CancelMap;
|
||||
use proxy_core::cancellation::CancellationHandler;
|
||||
use proxy_core::config::remote_storage_from_toml;
|
||||
use proxy_core::config::AuthenticationConfig;
|
||||
use proxy_core::config::CacheOptions;
|
||||
use proxy_core::config::HttpConfig;
|
||||
use proxy_core::config::ProjectInfoCacheOptions;
|
||||
use proxy_core::console;
|
||||
use proxy_core::context::parquet::ParquetUploadArgs;
|
||||
use proxy_core::http;
|
||||
use proxy_core::http::health_server::AppMetrics;
|
||||
use proxy_core::metrics::Metrics;
|
||||
use proxy_core::rate_limiter::EndpointRateLimiter;
|
||||
use proxy_core::rate_limiter::LeakyBucketConfig;
|
||||
use proxy_core::rate_limiter::RateBucketInfo;
|
||||
use proxy_core::rate_limiter::WakeComputeRateLimiter;
|
||||
use proxy_core::redis::cancellation_publisher::RedisPublisherClient;
|
||||
use proxy_core::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use proxy_core::redis::elasticache;
|
||||
use proxy_core::redis::notifications;
|
||||
use proxy_core::serverless::cancel_set::CancelSet;
|
||||
use proxy_core::serverless::GlobalConnPoolOptions;
|
||||
use proxy_core::usage_metrics;
|
||||
|
||||
use anyhow::bail;
|
||||
use proxy::config::{self, ProxyConfig};
|
||||
use proxy::serverless;
|
||||
use proxy_core::config::{self, ProxyConfig};
|
||||
use proxy_core::serverless;
|
||||
use proxy_sasl::scram::threadpool::ThreadPool;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::pin;
|
||||
@@ -268,7 +268,7 @@ struct SqlOverHttpArgs {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _logging_guard = proxy_core::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
@@ -279,7 +279,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
build_tag: BUILD_TAG,
|
||||
});
|
||||
|
||||
let jemalloc = match proxy::jemalloc::MetricRecorder::new() {
|
||||
let jemalloc = match proxy_core::jemalloc::MetricRecorder::new() {
|
||||
Ok(t) => Some(t),
|
||||
Err(e) => {
|
||||
tracing::error!(error = ?e, "could not start jemalloc metrics loop");
|
||||
@@ -394,7 +394,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
>::new(
|
||||
cancel_map.clone(),
|
||||
redis_publisher,
|
||||
proxy::metrics::CancellationSource::FromClient,
|
||||
proxy_core::metrics::CancellationSource::FromClient,
|
||||
));
|
||||
|
||||
// bit of a hack - find the min rps and max rps supported and turn it into
|
||||
@@ -419,7 +419,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// client facing tasks. these will exit on error or on cancellation
|
||||
// cancellation returns Ok(())
|
||||
let mut client_tasks = JoinSet::new();
|
||||
client_tasks.spawn(proxy::proxy::task_main(
|
||||
client_tasks.spawn(proxy_core::proxy::task_main(
|
||||
config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
@@ -443,20 +443,20 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
}
|
||||
|
||||
client_tasks.spawn(proxy::context::parquet::worker(
|
||||
client_tasks.spawn(proxy_core::context::parquet::worker(
|
||||
cancellation_token.clone(),
|
||||
args.parquet_upload,
|
||||
));
|
||||
|
||||
// maintenance tasks. these never return unless there's an error
|
||||
let mut maintenance_tasks = JoinSet::new();
|
||||
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone()));
|
||||
maintenance_tasks.spawn(proxy_core::handle_signals(cancellation_token.clone()));
|
||||
maintenance_tasks.spawn(http::health_server::task_main(
|
||||
http_listener,
|
||||
AppMetrics {
|
||||
jemalloc,
|
||||
neon_metrics,
|
||||
proxy: proxy::metrics::Metrics::get(),
|
||||
proxy: proxy_core::metrics::Metrics::get(),
|
||||
},
|
||||
));
|
||||
maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener));
|
||||
@@ -471,7 +471,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
if let auth::BackendType::Console(api, _) = &config.auth_backend {
|
||||
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
|
||||
if let proxy_core::console::provider::ConsoleBackend::Console(api) = &**api {
|
||||
match (redis_notifications_client, regional_redis_client.clone()) {
|
||||
(None, None) => {}
|
||||
(client1, client2) => {
|
||||
@@ -516,11 +516,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
.await
|
||||
{
|
||||
// exit immediately on maintenance task completion
|
||||
Either::Left((Some(res), _)) => break proxy::flatten_err(res)?,
|
||||
Either::Left((Some(res), _)) => break proxy_core::flatten_err(res)?,
|
||||
// exit with error immediately if all maintenance tasks have ceased (should be caught by branch above)
|
||||
Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"),
|
||||
// exit immediately on client task error
|
||||
Either::Right((Some(res), _)) => proxy::flatten_err(res)?,
|
||||
Either::Right((Some(res), _)) => proxy_core::flatten_err(res)?,
|
||||
// exit if all our client tasks have shutdown gracefully
|
||||
Either::Right((None, _)) => return Ok(()),
|
||||
}
|
||||
@@ -607,7 +607,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().wake_compute_lock,
|
||||
)?));
|
||||
)));
|
||||
tokio::spawn(locks.garbage_collect_worker());
|
||||
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
@@ -658,7 +658,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().proxy.connect_compute_lock,
|
||||
)?;
|
||||
);
|
||||
|
||||
let http_config = HttpConfig {
|
||||
pool_options: GlobalConnPoolOptions {
|
||||
@@ -707,7 +707,7 @@ mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use clap::Parser;
|
||||
use proxy::rate_limiter::RateBucketInfo;
|
||||
use proxy_core::rate_limiter::RateBucketInfo;
|
||||
|
||||
#[test]
|
||||
fn parse_endpoint_rps_limit() {
|
||||
37
proxy/sasl/Cargo.toml
Normal file
37
proxy/sasl/Cargo.toml
Normal file
@@ -0,0 +1,37 @@
|
||||
[package]
|
||||
name = "proxy-sasl"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
ahash.workspace = true
|
||||
anyhow.workspace = true
|
||||
base64.workspace = true
|
||||
bytes = { workspace = true, features = ["serde"] }
|
||||
crossbeam-deque.workspace = true
|
||||
hmac.workspace = true
|
||||
itertools.workspace = true
|
||||
lasso = { workspace = true, features = ["multi-threaded"] }
|
||||
measured = { workspace = true, features = ["lasso"] }
|
||||
parking_lot.workspace = true
|
||||
pq_proto.workspace = true
|
||||
rand.workspace = true
|
||||
rustls.workspace = true
|
||||
sha2 = { workspace = true, features = ["asm", "oid"] }
|
||||
subtle.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
tracing.workspace = true
|
||||
x509-parser.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
pbkdf2 = { workspace = true, features = ["simple", "std"] }
|
||||
uuid.workspace = true
|
||||
3
proxy/sasl/src/lib.rs
Normal file
3
proxy/sasl/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
mod parse;
|
||||
pub mod sasl;
|
||||
pub mod scram;
|
||||
43
proxy/sasl/src/parse.rs
Normal file
43
proxy/sasl/src/parse.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
//! Small parsing helpers.
|
||||
|
||||
use std::ffi::CStr;
|
||||
|
||||
pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
|
||||
let cstr = CStr::from_bytes_until_nul(bytes).ok()?;
|
||||
let (_, other) = bytes.split_at(cstr.to_bytes_with_nul().len());
|
||||
Some((cstr, other))
|
||||
}
|
||||
|
||||
/// See <https://doc.rust-lang.org/std/primitive.slice.html#method.split_array_ref>.
|
||||
pub fn split_at_const<const N: usize>(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> {
|
||||
(bytes.len() >= N).then(|| {
|
||||
let (head, tail) = bytes.split_at(N);
|
||||
(head.try_into().unwrap(), tail)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_split_cstr() {
|
||||
assert!(split_cstr(b"").is_none());
|
||||
assert!(split_cstr(b"foo").is_none());
|
||||
|
||||
let (cstr, rest) = split_cstr(b"\0").expect("uh-oh");
|
||||
assert_eq!(cstr.to_bytes(), b"");
|
||||
assert_eq!(rest, b"");
|
||||
|
||||
let (cstr, rest) = split_cstr(b"foo\0bar").expect("uh-oh");
|
||||
assert_eq!(cstr.to_bytes(), b"foo");
|
||||
assert_eq!(rest, b"bar");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_split_at_const() {
|
||||
assert!(split_at_const::<0>(b"").is_some());
|
||||
assert!(split_at_const::<1>(b"").is_none());
|
||||
assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k"))));
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@ mod channel_binding;
|
||||
mod messages;
|
||||
mod stream;
|
||||
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
// use crate::error::{ReportableError, UserFacingError};
|
||||
use std::io;
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -40,29 +40,29 @@ pub enum Error {
|
||||
Io(#[from] io::Error),
|
||||
}
|
||||
|
||||
impl UserFacingError for Error {
|
||||
fn to_string_client(&self) -> String {
|
||||
use Error::*;
|
||||
match self {
|
||||
ChannelBindingFailed(m) => m.to_string(),
|
||||
ChannelBindingBadMethod(m) => format!("unsupported channel binding method {m}"),
|
||||
_ => "authentication protocol violation".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
// impl UserFacingError for Error {
|
||||
// fn to_string_client(&self) -> String {
|
||||
// use Error::*;
|
||||
// match self {
|
||||
// ChannelBindingFailed(m) => m.to_string(),
|
||||
// ChannelBindingBadMethod(m) => format!("unsupported channel binding method {m}"),
|
||||
// _ => "authentication protocol violation".to_string(),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
impl ReportableError for Error {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
Error::ChannelBindingFailed(_) => crate::error::ErrorKind::User,
|
||||
Error::ChannelBindingBadMethod(_) => crate::error::ErrorKind::User,
|
||||
Error::BadClientMessage(_) => crate::error::ErrorKind::User,
|
||||
Error::MissingBinding => crate::error::ErrorKind::Service,
|
||||
Error::Base64(_) => crate::error::ErrorKind::ControlPlane,
|
||||
Error::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
}
|
||||
}
|
||||
}
|
||||
// impl ReportableError for Error {
|
||||
// fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
// match self {
|
||||
// Error::ChannelBindingFailed(_) => crate::error::ErrorKind::User,
|
||||
// Error::ChannelBindingBadMethod(_) => crate::error::ErrorKind::User,
|
||||
// Error::BadClientMessage(_) => crate::error::ErrorKind::User,
|
||||
// Error::MissingBinding => crate::error::ErrorKind::Service,
|
||||
// Error::Base64(_) => crate::error::ErrorKind::ControlPlane,
|
||||
// Error::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
/// A convenient result type for SASL exchange.
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -1,7 +1,10 @@
|
||||
//! Abstraction for the string-oriented SASL protocols.
|
||||
|
||||
use super::{messages::ServerMessage, Mechanism};
|
||||
use crate::stream::PqStream;
|
||||
use pq_proto::{
|
||||
framed::{ConnectionError, Framed},
|
||||
FeMessage, ProtocolError,
|
||||
};
|
||||
use std::io;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
@@ -9,7 +12,7 @@ use tracing::info;
|
||||
/// Abstracts away all peculiarities of the libpq's protocol.
|
||||
pub struct SaslStream<'a, S> {
|
||||
/// The underlying stream.
|
||||
stream: &'a mut PqStream<S>,
|
||||
stream: &'a mut Framed<S>,
|
||||
/// Current password message we received from client.
|
||||
current: bytes::Bytes,
|
||||
/// First SASL message produced by client.
|
||||
@@ -17,7 +20,7 @@ pub struct SaslStream<'a, S> {
|
||||
}
|
||||
|
||||
impl<'a, S> SaslStream<'a, S> {
|
||||
pub fn new(stream: &'a mut PqStream<S>, first: &'a str) -> Self {
|
||||
pub fn new(stream: &'a mut Framed<S>, first: &'a str) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
current: bytes::Bytes::new(),
|
||||
@@ -26,6 +29,27 @@ impl<'a, S> SaslStream<'a, S> {
|
||||
}
|
||||
}
|
||||
|
||||
fn err_connection() -> io::Error {
|
||||
io::Error::new(io::ErrorKind::ConnectionAborted, "connection is lost")
|
||||
}
|
||||
|
||||
pub async fn read_password_message<S: AsyncRead + Unpin>(
|
||||
framed: &mut Framed<S>,
|
||||
) -> io::Result<bytes::Bytes> {
|
||||
let msg = framed
|
||||
.read_message()
|
||||
.await
|
||||
.map_err(ConnectionError::into_io_error)?
|
||||
.ok_or_else(err_connection)?;
|
||||
match msg {
|
||||
FeMessage::PasswordMessage(msg) => Ok(msg),
|
||||
bad => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("unexpected message type: {:?}", bad),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + Unpin> SaslStream<'_, S> {
|
||||
// Receive a new SASL message from the client.
|
||||
async fn recv(&mut self) -> io::Result<&str> {
|
||||
@@ -33,7 +57,7 @@ impl<S: AsyncRead + Unpin> SaslStream<'_, S> {
|
||||
return Ok(first);
|
||||
}
|
||||
|
||||
self.current = self.stream.read_password_message().await?;
|
||||
self.current = read_password_message(self.stream).await?;
|
||||
let s = std::str::from_utf8(&self.current)
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad encoding"))?;
|
||||
|
||||
@@ -44,7 +68,10 @@ impl<S: AsyncRead + Unpin> SaslStream<'_, S> {
|
||||
impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
|
||||
// Send a SASL message to the client.
|
||||
async fn send(&mut self, msg: &ServerMessage<&str>) -> io::Result<()> {
|
||||
self.stream.write_message(&msg.to_reply()).await?;
|
||||
self.stream
|
||||
.write_message(&msg.to_reply())
|
||||
.map_err(ProtocolError::into_io_error)?;
|
||||
self.stream.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -15,12 +15,16 @@ mod secret;
|
||||
mod signature;
|
||||
pub mod threadpool;
|
||||
|
||||
use anyhow::Context;
|
||||
pub use exchange::{exchange, Exchange};
|
||||
pub use key::ScramKey;
|
||||
use rustls::pki_types::CertificateDer;
|
||||
pub use secret::ServerSecret;
|
||||
|
||||
use hmac::{Hmac, Mac};
|
||||
use sha2::{Digest, Sha256};
|
||||
use tracing::{error, info};
|
||||
use x509_parser::oid_registry;
|
||||
|
||||
const SCRAM_SHA_256: &str = "SCRAM-SHA-256";
|
||||
const SCRAM_SHA_256_PLUS: &str = "SCRAM-SHA-256-PLUS";
|
||||
@@ -57,12 +61,71 @@ fn sha256<'a>(parts: impl IntoIterator<Item = &'a [u8]>) -> [u8; 32] {
|
||||
hasher.finalize().into()
|
||||
}
|
||||
|
||||
/// Channel binding parameter
|
||||
///
|
||||
/// <https://www.rfc-editor.org/rfc/rfc5929#section-4>
|
||||
/// Description: The hash of the TLS server's certificate as it
|
||||
/// appears, octet for octet, in the server's Certificate message. Note
|
||||
/// that the Certificate message contains a certificate_list, in which
|
||||
/// the first element is the server's certificate.
|
||||
///
|
||||
/// The hash function is to be selected as follows:
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function, and that hash function is either MD5 or SHA-1, then use SHA-256;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses a single hash
|
||||
/// function and that hash function neither MD5 nor SHA-1, then use
|
||||
/// the hash function associated with the certificate's
|
||||
/// signatureAlgorithm;
|
||||
///
|
||||
/// * if the certificate's signatureAlgorithm uses no hash functions or
|
||||
/// uses multiple hash functions, then this channel binding type's
|
||||
/// channel bindings are undefined at this time (updates to is channel
|
||||
/// binding type may occur to address this issue if it ever arises).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum TlsServerEndPoint {
|
||||
Sha256([u8; 32]),
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl TlsServerEndPoint {
|
||||
pub fn new(cert: &CertificateDer) -> anyhow::Result<Self> {
|
||||
let sha256_oids = [
|
||||
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
|
||||
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
|
||||
oid_registry::OID_PKCS1_SHA256WITHRSA,
|
||||
];
|
||||
|
||||
let pem = x509_parser::parse_x509_certificate(cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
info!(subject = %pem.subject, "parsing TLS certificate");
|
||||
|
||||
let reg = oid_registry::OidRegistry::default().with_all_crypto();
|
||||
let oid = pem.signature_algorithm.oid();
|
||||
let alg = reg.get(oid);
|
||||
if sha256_oids.contains(oid) {
|
||||
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
|
||||
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
|
||||
Ok(Self::Sha256(tls_server_end_point))
|
||||
} else {
|
||||
error!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), "unknown channel binding");
|
||||
Ok(Self::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn supported(&self) -> bool {
|
||||
!matches!(self, TlsServerEndPoint::Undefined)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
intern::EndpointIdInt,
|
||||
sasl::{Mechanism, Step},
|
||||
EndpointId,
|
||||
scram::TlsServerEndPoint,
|
||||
};
|
||||
|
||||
use super::{threadpool::ThreadPool, Exchange, ServerSecret};
|
||||
@@ -79,11 +142,7 @@ mod tests {
|
||||
const NONCE: [u8; 18] = [
|
||||
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
|
||||
];
|
||||
let mut exchange = Exchange::new(
|
||||
&secret,
|
||||
|| NONCE,
|
||||
crate::config::TlsServerEndPoint::Undefined,
|
||||
);
|
||||
let mut exchange = Exchange::new(&secret, || NONCE, TlsServerEndPoint::Undefined);
|
||||
|
||||
let client_first = "n,,n=user,r=rOprNGfwEbeRWgbNEkqO";
|
||||
let client_final = "c=biws,r=rOprNGfwEbeRWgbNEkqOAQIDBAUGBwgJCgsMDQ4PEBES,p=rw1r5Kph5ThxmaUBC2GAQ6MfXbPnNkFiTIvdb/Rear0=";
|
||||
@@ -120,11 +179,11 @@ mod tests {
|
||||
|
||||
async fn run_round_trip_test(server_password: &str, client_password: &str) {
|
||||
let pool = ThreadPool::new(1);
|
||||
let ep = "foo";
|
||||
|
||||
let ep = EndpointId::from("foo");
|
||||
let ep = EndpointIdInt::from(ep);
|
||||
|
||||
let scram_secret = ServerSecret::build(server_password).await.unwrap();
|
||||
let scram_secret = ServerSecret::build_test_secret(server_password)
|
||||
.await
|
||||
.unwrap();
|
||||
let outcome = super::exchange(&pool, ep, &scram_secret, client_password.as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Implementation of the SCRAM authentication algorithm.
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::hash::Hash;
|
||||
|
||||
use hmac::{Hmac, Mac};
|
||||
use sha2::Sha256;
|
||||
@@ -13,8 +14,6 @@ use super::secret::ServerSecret;
|
||||
use super::signature::SignatureBuilder;
|
||||
use super::threadpool::ThreadPool;
|
||||
use super::ScramKey;
|
||||
use crate::config;
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::sasl::{self, ChannelBinding, Error as SaslError};
|
||||
|
||||
/// The only channel binding mode we currently support.
|
||||
@@ -59,14 +58,14 @@ enum ExchangeState {
|
||||
pub struct Exchange<'a> {
|
||||
state: ExchangeState,
|
||||
secret: &'a ServerSecret,
|
||||
tls_server_end_point: config::TlsServerEndPoint,
|
||||
tls_server_end_point: super::TlsServerEndPoint,
|
||||
}
|
||||
|
||||
impl<'a> Exchange<'a> {
|
||||
pub fn new(
|
||||
secret: &'a ServerSecret,
|
||||
nonce: fn() -> [u8; SCRAM_RAW_NONCE_LEN],
|
||||
tls_server_end_point: config::TlsServerEndPoint,
|
||||
tls_server_end_point: super::TlsServerEndPoint,
|
||||
) -> Self {
|
||||
Self {
|
||||
state: ExchangeState::Initial(SaslInitial { nonce }),
|
||||
@@ -77,15 +76,15 @@ impl<'a> Exchange<'a> {
|
||||
}
|
||||
|
||||
// copied from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L236-L248>
|
||||
async fn derive_client_key(
|
||||
pool: &ThreadPool,
|
||||
endpoint: EndpointIdInt,
|
||||
async fn derive_client_key<K: Hash + Send + 'static>(
|
||||
pool: &ThreadPool<K>,
|
||||
concurrency_key: K,
|
||||
password: &[u8],
|
||||
salt: &[u8],
|
||||
iterations: u32,
|
||||
) -> ScramKey {
|
||||
let salted_password = pool
|
||||
.spawn_job(endpoint, Pbkdf2::start(password, salt, iterations))
|
||||
.spawn_job(concurrency_key, Pbkdf2::start(password, salt, iterations))
|
||||
.await
|
||||
.expect("job should not be cancelled");
|
||||
|
||||
@@ -101,14 +100,15 @@ async fn derive_client_key(
|
||||
make_key(b"Client Key").into()
|
||||
}
|
||||
|
||||
pub async fn exchange(
|
||||
pool: &ThreadPool,
|
||||
endpoint: EndpointIdInt,
|
||||
pub async fn exchange<K: Hash + Send + 'static>(
|
||||
pool: &ThreadPool<K>,
|
||||
concurrency_key: K,
|
||||
secret: &ServerSecret,
|
||||
password: &[u8],
|
||||
) -> sasl::Result<sasl::Outcome<super::ScramKey>> {
|
||||
let salt = base64::decode(&secret.salt_base64)?;
|
||||
let client_key = derive_client_key(pool, endpoint, password, &salt, secret.iterations).await;
|
||||
let client_key =
|
||||
derive_client_key(pool, concurrency_key, password, &salt, secret.iterations).await;
|
||||
|
||||
if secret.is_password_invalid(&client_key).into() {
|
||||
Ok(sasl::Outcome::Failure("password doesn't match"))
|
||||
@@ -121,7 +121,7 @@ impl SaslInitial {
|
||||
fn transition(
|
||||
&self,
|
||||
secret: &ServerSecret,
|
||||
tls_server_end_point: &config::TlsServerEndPoint,
|
||||
tls_server_end_point: &super::TlsServerEndPoint,
|
||||
input: &str,
|
||||
) -> sasl::Result<sasl::Step<SaslSentInner, Infallible>> {
|
||||
let client_first_message = ClientFirstMessage::parse(input)
|
||||
@@ -156,7 +156,7 @@ impl SaslSentInner {
|
||||
fn transition(
|
||||
&self,
|
||||
secret: &ServerSecret,
|
||||
tls_server_end_point: &config::TlsServerEndPoint,
|
||||
tls_server_end_point: &super::TlsServerEndPoint,
|
||||
input: &str,
|
||||
) -> sasl::Result<sasl::Step<Infallible, super::ScramKey>> {
|
||||
let Self {
|
||||
@@ -169,8 +169,8 @@ impl SaslSentInner {
|
||||
.ok_or(SaslError::BadClientMessage("invalid client-final-message"))?;
|
||||
|
||||
let channel_binding = cbind_flag.encode(|_| match tls_server_end_point {
|
||||
config::TlsServerEndPoint::Sha256(x) => Ok(x),
|
||||
config::TlsServerEndPoint::Undefined => Err(SaslError::MissingBinding),
|
||||
super::TlsServerEndPoint::Sha256(x) => Ok(x),
|
||||
super::TlsServerEndPoint::Undefined => Err(SaslError::MissingBinding),
|
||||
})?;
|
||||
|
||||
// This might've been caused by a MITM attack
|
||||
@@ -64,9 +64,7 @@ impl ServerSecret {
|
||||
}
|
||||
|
||||
/// Build a new server secret from the prerequisites.
|
||||
/// XXX: We only use this function in tests.
|
||||
#[cfg(test)]
|
||||
pub async fn build(password: &str) -> Option<Self> {
|
||||
pub async fn build_test_secret(password: &str) -> Option<Self> {
|
||||
Self::parse(&postgres_protocol::password::scram_sha_256(password.as_bytes()).await)
|
||||
}
|
||||
}
|
||||
@@ -4,29 +4,36 @@
|
||||
//! 1. Fairness per endpoint.
|
||||
//! 2. Yield support for high iteration counts.
|
||||
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
use std::{
|
||||
hash::Hash,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use crossbeam_deque::{Injector, Stealer, Worker};
|
||||
use itertools::Itertools;
|
||||
use measured::{
|
||||
label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue},
|
||||
CounterVec, Gauge, GaugeVec, LabelGroup, MetricGroup,
|
||||
};
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use rand::Rng;
|
||||
use rand::{rngs::SmallRng, SeedableRng};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::{
|
||||
intern::EndpointIdInt,
|
||||
metrics::{ThreadPoolMetrics, ThreadPoolWorkerId},
|
||||
// intern::EndpointIdInt,
|
||||
// metrics::{ThreadPoolMetrics, ThreadPoolWorkerId},
|
||||
scram::countmin::CountMinSketch,
|
||||
};
|
||||
|
||||
use super::pbkdf2::Pbkdf2;
|
||||
|
||||
pub struct ThreadPool {
|
||||
queue: Injector<JobSpec>,
|
||||
stealers: Vec<Stealer<JobSpec>>,
|
||||
pub struct ThreadPool<K> {
|
||||
queue: Injector<JobSpec<K>>,
|
||||
stealers: Vec<Stealer<JobSpec<K>>>,
|
||||
parkers: Vec<(Condvar, Mutex<ThreadState>)>,
|
||||
/// bitpacked representation.
|
||||
/// lower 8 bits = number of sleeping threads
|
||||
@@ -42,7 +49,7 @@ enum ThreadState {
|
||||
Active,
|
||||
}
|
||||
|
||||
impl ThreadPool {
|
||||
impl<K: Hash + Send + 'static> ThreadPool<K> {
|
||||
pub fn new(n_workers: u8) -> Arc<Self> {
|
||||
let workers = (0..n_workers).map(|_| Worker::new_fifo()).collect_vec();
|
||||
let stealers = workers.iter().map(|w| w.stealer()).collect_vec();
|
||||
@@ -68,11 +75,7 @@ impl ThreadPool {
|
||||
pool
|
||||
}
|
||||
|
||||
pub fn spawn_job(
|
||||
&self,
|
||||
endpoint: EndpointIdInt,
|
||||
pbkdf2: Pbkdf2,
|
||||
) -> oneshot::Receiver<[u8; 32]> {
|
||||
pub fn spawn_job(&self, key: K, pbkdf2: Pbkdf2) -> oneshot::Receiver<[u8; 32]> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let queue_was_empty = self.queue.is_empty();
|
||||
@@ -81,7 +84,7 @@ impl ThreadPool {
|
||||
self.queue.push(JobSpec {
|
||||
response: tx,
|
||||
pbkdf2,
|
||||
endpoint,
|
||||
key,
|
||||
});
|
||||
|
||||
// inspired from <https://github.com/rayon-rs/rayon/blob/3e3962cb8f7b50773bcc360b48a7a674a53a2c77/rayon-core/src/sleep/mod.rs#L242>
|
||||
@@ -139,7 +142,12 @@ impl ThreadPool {
|
||||
}
|
||||
}
|
||||
|
||||
fn steal(&self, rng: &mut impl Rng, skip: usize, worker: &Worker<JobSpec>) -> Option<JobSpec> {
|
||||
fn steal(
|
||||
&self,
|
||||
rng: &mut impl Rng,
|
||||
skip: usize,
|
||||
worker: &Worker<JobSpec<K>>,
|
||||
) -> Option<JobSpec<K>> {
|
||||
// announce thread as idle
|
||||
self.counters.fetch_add(256, Ordering::SeqCst);
|
||||
|
||||
@@ -188,7 +196,11 @@ impl ThreadPool {
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
|
||||
fn thread_rt<K: Hash + Send + 'static>(
|
||||
pool: Arc<ThreadPool<K>>,
|
||||
worker: Worker<JobSpec<K>>,
|
||||
index: usize,
|
||||
) {
|
||||
/// interval when we should steal from the global queue
|
||||
/// so that tail latencies are managed appropriately
|
||||
const STEAL_INTERVAL: usize = 61;
|
||||
@@ -236,7 +248,7 @@ fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
|
||||
|
||||
// receiver is closed, cancel the task
|
||||
if !job.response.is_closed() {
|
||||
let rate = sketch.inc_and_return(&job.endpoint, job.pbkdf2.cost());
|
||||
let rate = sketch.inc_and_return(&job.key, job.pbkdf2.cost());
|
||||
|
||||
const P: f64 = 2000.0;
|
||||
// probability decreases as rate increases.
|
||||
@@ -287,24 +299,96 @@ fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
|
||||
}
|
||||
}
|
||||
|
||||
struct JobSpec {
|
||||
struct JobSpec<K> {
|
||||
response: oneshot::Sender<[u8; 32]>,
|
||||
pbkdf2: Pbkdf2,
|
||||
endpoint: EndpointIdInt,
|
||||
key: K,
|
||||
}
|
||||
|
||||
pub struct ThreadPoolWorkers(usize);
|
||||
pub struct ThreadPoolWorkerId(pub usize);
|
||||
|
||||
impl LabelValue for ThreadPoolWorkerId {
|
||||
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
|
||||
v.write_int(self.0 as i64)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelGroup for ThreadPoolWorkerId {
|
||||
fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
|
||||
v.write_value(LabelName::from_str("worker"), self);
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelGroupSet for ThreadPoolWorkers {
|
||||
type Group<'a> = ThreadPoolWorkerId;
|
||||
|
||||
fn cardinality(&self) -> Option<usize> {
|
||||
Some(self.0)
|
||||
}
|
||||
|
||||
fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
|
||||
Some(value)
|
||||
}
|
||||
|
||||
fn decode_dense(&self, value: usize) -> Self::Group<'_> {
|
||||
ThreadPoolWorkerId(value)
|
||||
}
|
||||
|
||||
type Unique = usize;
|
||||
|
||||
fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
|
||||
Some(value.0)
|
||||
}
|
||||
|
||||
fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
|
||||
ThreadPoolWorkerId(*value)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelSet for ThreadPoolWorkers {
|
||||
type Value<'a> = ThreadPoolWorkerId;
|
||||
|
||||
fn dynamic_cardinality(&self) -> Option<usize> {
|
||||
Some(self.0)
|
||||
}
|
||||
|
||||
fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
|
||||
(value.0 < self.0).then_some(value.0)
|
||||
}
|
||||
|
||||
fn decode(&self, value: usize) -> Self::Value<'_> {
|
||||
ThreadPoolWorkerId(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedCardinalitySet for ThreadPoolWorkers {
|
||||
fn cardinality(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(workers: usize))]
|
||||
pub struct ThreadPoolMetrics {
|
||||
pub injector_queue_depth: Gauge,
|
||||
#[metric(init = GaugeVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_queue_depth: GaugeVec<ThreadPoolWorkers>,
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::EndpointId;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn hash_is_correct() {
|
||||
let pool = ThreadPool::new(1);
|
||||
|
||||
let ep = EndpointId::from("foo");
|
||||
let ep = EndpointIdInt::from(ep);
|
||||
let ep = "foo";
|
||||
|
||||
let salt = [0x55; 32];
|
||||
let actual = pool
|
||||
@@ -30,13 +30,17 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
|
||||
der = { version = "0.7", default-features = false, features = ["oid", "pem", "std"] }
|
||||
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
|
||||
digest = { version = "0.10", features = ["mac", "oid", "std"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-io = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown = { version = "0.14", features = ["raw"] }
|
||||
hex = { version = "0.4", features = ["serde"] }
|
||||
@@ -44,6 +48,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
@@ -64,8 +69,10 @@ rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
serde_json = { version = "1", features = ["raw_value"] }
|
||||
sha2 = { version = "0.10", features = ["asm"] }
|
||||
sha2 = { version = "0.10", features = ["asm", "oid"] }
|
||||
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
tikv-jemalloc-sys = { version = "0.5" }
|
||||
@@ -81,7 +88,7 @@ tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4", "v7"] }
|
||||
zeroize = { version = "1", features = ["derive"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
@@ -97,6 +104,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown = { version = "0.14", features = ["raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
|
||||
Reference in New Issue
Block a user