diff --git a/Cargo.lock b/Cargo.lock index 0053c3b08c..9a534142ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,7 +280,7 @@ dependencies = [ "control_plane", "futures", "git-version", - "hyper", + "hyper 1.1.0", "metrics", "pageserver_api", "pageserver_client", @@ -324,8 +324,8 @@ dependencies = [ "bytes", "fastrand 2.0.0", "hex", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.26", "ring 0.17.6", "time", "tokio", @@ -355,8 +355,8 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "pin-project-lite", "tracing", ] @@ -377,7 +377,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "fastrand 2.0.0", - "http", + "http 0.2.9", "percent-encoding", "tracing", "uuid", @@ -404,8 +404,8 @@ dependencies = [ "aws-smithy-xml", "aws-types", "bytes", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "once_cell", "percent-encoding", "regex", @@ -430,7 +430,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", + "http 0.2.9", "regex", "tracing", ] @@ -452,7 +452,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "http", + "http 0.2.9", "regex", "tracing", ] @@ -475,7 +475,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "http", + "http 0.2.9", "regex", "tracing", ] @@ -496,7 +496,7 @@ dependencies = [ "form_urlencoded", "hex", "hmac", - "http", + "http 0.2.9", "once_cell", "p256", "percent-encoding", @@ -532,8 +532,8 @@ dependencies = [ "crc32c", "crc32fast", "hex", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "md-5", "pin-project-lite", "sha1", @@ -564,8 +564,8 @@ dependencies = [ "bytes", "bytes-utils", "futures-core", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "once_cell", "percent-encoding", "pin-project-lite", @@ -604,10 +604,10 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand 2.0.0", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-rustls", "once_cell", "pin-project-lite", @@ -626,7 +626,7 @@ dependencies = [ "aws-smithy-async", "aws-smithy-types", "bytes", - "http", + "http 0.2.9", "pin-project-lite", "tokio", "tracing", @@ -635,16 +635,19 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.0.2" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f48b3f27ddb40ab19892a5abda331f403e3cb877965e4e51171447807104af" +checksum = "4e9a85eafeaf783b2408e35af599e8b96f2c49d9a5d13ad3a887fbdefb6bc744" dependencies = [ "base64-simd", "bytes", "bytes-utils", "futures-core", - "http", - "http-body", + "http 0.2.9", + "http 1.0.0", + "http-body 0.4.5", + "http-body 1.0.0", + "http-body-util", "itoa", "num-integer", "pin-project-lite", @@ -675,7 +678,7 @@ dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "http", + "http 0.2.9", "rustc_version", "tracing", ] @@ -692,9 +695,9 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "itoa", "matchit", "memchr", @@ -724,8 +727,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "mime", "rustversion", "tower-layer", @@ -1214,7 +1217,11 @@ dependencies = [ "compute_api", "flate2", "futures", - "hyper", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "nix 0.26.2", "notify", "num_cpus", @@ -1330,7 +1337,7 @@ dependencies = [ "futures", "git-version", "hex", - "hyper", + "hyper 1.1.0", "nix 0.26.2", "once_cell", "pageserver_api", @@ -1974,9 +1981,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" @@ -2147,7 +2154,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", + "indexmap 2.0.1", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap 2.0.1", "slab", "tokio", @@ -2298,6 +2324,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -2305,7 +2342,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -2367,9 +2427,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -2381,14 +2441,34 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.26", "log", "rustls", "rustls-native-certs", @@ -2402,7 +2482,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.26", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -2415,7 +2495,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.26", "native-tls", "tokio", "tokio-native-tls", @@ -2427,13 +2507,31 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9" dependencies = [ - "hyper", + "hyper 0.14.26", "pin-project-lite", "tokio", "tokio-tungstenite", "tungstenite", ] +[[package]] +name = "hyper-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.56" @@ -3016,7 +3114,7 @@ dependencies = [ "base64 0.13.1", "chrono", "getrandom 0.2.11", - "http", + "http 0.2.9", "rand 0.8.5", "serde", "serde_json", @@ -3118,7 +3216,7 @@ checksum = "a819b71d6530c4297b49b3cae2939ab3a8cc1b9f382826a1bc29dd0ca3864906" dependencies = [ "async-trait", "bytes", - "http", + "http 0.2.9", "opentelemetry_api", "reqwest", ] @@ -3132,11 +3230,11 @@ dependencies = [ "async-trait", "futures", "futures-util", - "http", + "http 0.2.9", "opentelemetry", "opentelemetry-http", "opentelemetry-proto", - "prost", + "prost 0.11.9", "reqwest", "thiserror", ] @@ -3150,7 +3248,7 @@ dependencies = [ "futures", "futures-util", "opentelemetry", - "prost", + "prost 0.11.9", "tonic 0.8.3", ] @@ -3321,7 +3419,7 @@ dependencies = [ "hex-literal", "humantime", "humantime-serde", - "hyper", + "hyper 1.1.0", "itertools", "md5", "metrics", @@ -3869,6 +3967,15 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", +] + [[package]] name = "prost-build" version = "0.11.9" @@ -3883,7 +3990,7 @@ dependencies = [ "multimap", "petgraph", "prettyplease 0.1.25", - "prost", + "prost 0.11.9", "prost-types", "regex", "syn 1.0.109", @@ -3910,7 +4017,7 @@ version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", ] [[package]] @@ -3936,7 +4043,7 @@ dependencies = [ "hmac", "hostname", "humantime", - "hyper", + "hyper 1.1.0", "hyper-tungstenite", "ipnet", "itertools", @@ -4234,8 +4341,12 @@ dependencies = [ "camino-tempfile", "futures", "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", "http-types", - "hyper", + "hyper 1.1.0", + "hyper-util", "itertools", "metrics", "once_cell", @@ -4264,10 +4375,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-rustls", "hyper-tls", "ipnet", @@ -4306,7 +4417,7 @@ checksum = "4531c89d50effe1fac90d095c8b133c20c5c714204feee0bfc3fd158e784209d" dependencies = [ "anyhow", "async-trait", - "http", + "http 0.2.9", "reqwest", "serde", "task-local-extensions", @@ -4324,8 +4435,8 @@ dependencies = [ "chrono", "futures", "getrandom 0.2.11", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.26", "parking_lot 0.11.2", "reqwest", "reqwest-middleware", @@ -4408,14 +4519,19 @@ dependencies = [ [[package]] name = "routerify" version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "496c1d3718081c45ba9c31fbfc07417900aa96f4070ff90dc29961836b7a9945" +source = "git+https://github.com/conradludgate/routerify?branch=hyper1#4e853cc126bcf1eb97c65352177d3013de880f5f" dependencies = [ - "http", - "hyper", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", "lazy_static", "percent-encoding", + "pin-project-lite", "regex", + "sync_wrapper", ] [[package]] @@ -4538,9 +4654,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -4649,7 +4765,7 @@ dependencies = [ "git-version", "hex", "humantime", - "hyper", + "hyper 1.1.0", "metrics", "once_cell", "parking_lot 0.12.1", @@ -5186,15 +5302,16 @@ dependencies = [ "futures-core", "futures-util", "git-version", + "http-body-util", "humantime", - "hyper", + "hyper 1.1.0", "metrics", "once_cell", "parking_lot 0.12.1", - "prost", + "prost 0.11.9", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic 0.10.0", "tonic-build", "tracing", "utils", @@ -5488,7 +5605,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81294c017957a1a69794f506723519255879e15a870507faf45dfed288b763dd" dependencies = [ "futures-util", - "hyper", + "hyper 0.14.26", "pin-project-lite", "thiserror", "tokio", @@ -5693,14 +5810,14 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.11.9", "prost-derive", "tokio", "tokio-stream", @@ -5714,25 +5831,23 @@ dependencies = [ [[package]] name = "tonic" -version = "0.9.2" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +checksum = "5469afaf78a11265c343a88969045c1568aa8ecc6c787dbf756e92e70f199861" dependencies = [ "async-stream", "async-trait", "axum", "base64 0.21.1", "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.3", "rustls-native-certs", "rustls-pemfile", "tokio", @@ -5924,7 +6039,7 @@ dependencies = [ name = "tracing-utils" version = "0.1.0" dependencies = [ - "hyper", + "hyper 1.1.0", "opentelemetry", "opentelemetry-otlp", "opentelemetry-semantic-conventions", @@ -5951,7 +6066,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.9", "httparse", "log", "rand 0.8.5", @@ -6114,7 +6229,8 @@ dependencies = [ "heapless", "hex", "hex-literal", - "hyper", + "http-body-util", + "hyper 1.1.0", "jsonwebtoken", "metrics", "nix 0.26.2", @@ -6639,7 +6755,7 @@ dependencies = [ "hashbrown 0.14.0", "hex", "hmac", - "hyper", + "hyper 0.14.26", "indexmap 1.9.3", "itertools", "libc", @@ -6651,7 +6767,7 @@ dependencies = [ "num-traits", "once_cell", "parquet", - "prost", + "prost 0.11.9", "rand 0.8.5", "regex", "regex-automata 0.4.3", diff --git a/Cargo.toml b/Cargo.toml index 5d5d2f4a55..146d7eb114 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ async-trait = "0.1" aws-config = { version = "1.0", default-features = false, features=["rustls"] } aws-sdk-s3 = "1.0" aws-smithy-async = { version = "1.0", default-features = false, features=["rt-tokio"] } -aws-smithy-types = "1.0" +aws-smithy-types = { version = "1.1.2", features = ["http-body-1-x"] } aws-credential-types = "1.0" axum = { version = "0.6.20", features = ["ws"] } base64 = "0.13.0" @@ -89,7 +89,11 @@ hostname = "0.3.1" http-types = { version = "2", default-features = false } humantime = "2.1" humantime-serde = "1.1.1" -hyper = "0.14" +hyper = "1.0.0" +hyper-util = "0.1.0" +http = "1" +http-body = "1" +http-body-util = "0.1" hyper-tungstenite = "0.11" inotify = "0.10.2" ipnet = "2.9.0" @@ -121,7 +125,7 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls" reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] } reqwest-middleware = "0.2.0" reqwest-retry = "0.2.2" -routerify = "3" +routerify = { git = "https://github.com/conradludgate/routerify", branch = "hyper1" } rpds = "0.13" rustc-hash = "1.1.0" rustls = "0.21" @@ -159,7 +163,7 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.7" toml_edit = "0.19" -tonic = {version = "0.9", features = ["tls", "tls-roots"]} +tonic = {version = "0.10", features = ["tls", "tls-roots"]} tracing = "0.1" tracing-error = "0.2.0" tracing-opentelemetry = "0.19.0" diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 759a117ee9..bb1a712acd 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -12,7 +12,11 @@ cfg-if.workspace = true clap.workspace = true flate2.workspace = true futures.workspace = true -hyper = { workspace = true, features = ["full"] } +hyper = { workspace = true, features = ["server"] } +hyper-util = { workspace = true, features = ["tokio", "server", "server-auto"] } +http = { workspace = true, features = [] } +http-body = { workspace = true, features = [] } +http-body-util = { workspace = true, features = [] } nix.workspace = true notify.workspace = true num_cpus.workspace = true diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index fa2c4cff28..c18fa16946 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -6,14 +6,21 @@ use std::sync::Arc; use std::thread; use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; +use crate::http::body::Body; use compute_api::requests::ConfigurationRequest; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; use anyhow::Result; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use http_body_util::BodyExt; +use hyper::body::Incoming; +use hyper::service::service_fn; +use hyper::{Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioExecutor; +use hyper_util::rt::TokioIo; +use hyper_util::server::conn; use num_cpus; use serde_json; +use tokio::net::TcpListener; use tokio::task; use tracing::{error, info, warn}; use tracing_utils::http::OtelName; @@ -36,7 +43,7 @@ fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { } // Service function to handle all available routes. -async fn routes(req: Request, compute: &Arc) -> Response { +async fn routes(req: Request, compute: &Arc) -> Response { // // NOTE: The URI path is currently included in traces. That's OK because // it doesn't contain any variable parts or sensitive information. But @@ -210,7 +217,7 @@ async fn routes(req: Request, compute: &Arc) -> Response, + req: Request, compute: &Arc, ) -> Result { if !compute.live_config_allowed { @@ -220,7 +227,7 @@ async fn handle_configure_request( )); } - let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let body_bytes = req.into_body().collect().await.unwrap().to_bytes(); let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); if let Ok(request) = serde_json::from_str::(&spec_raw) { let spec = request.spec; @@ -304,35 +311,43 @@ async fn serve(port: u16, state: Arc) { // see e.g. https://github.com/rust-lang/rust/pull/34440 let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port); - let make_service = make_service_fn(move |_conn| { + let service = service_fn(move |req: Request| { let state = state.clone(); async move { - Ok::<_, Infallible>(service_fn(move |req: Request| { - let state = state.clone(); - async move { - Ok::<_, Infallible>( - // NOTE: We include the URI path in the string. It - // doesn't contain any variable parts or sensitive - // information in this API. - tracing_utils::http::tracing_handler( - req, - |req| routes(req, &state), - OtelName::UriPath, - ) - .await, - ) - } - })) + Ok::<_, Infallible>( + // NOTE: We include the URI path in the string. It + // doesn't contain any variable parts or sensitive + // information in this API. + tracing_utils::http::tracing_handler( + req, + |req| routes(req, &state), + OtelName::UriPath, + ) + .await, + ) } }); info!("starting HTTP server on {}", addr); - let server = Server::bind(&addr).serve(make_service); - - // Run this server forever - if let Err(e) = server.await { - error!("server error: {}", e); + let listener = TcpListener::bind(addr).await.unwrap(); + loop { + let (stream, _) = match listener.accept().await { + Ok(r) => r, + Err(e) => { + error!("server error: {}", e); + return; + } + }; + let io = TokioIo::new(stream); + let service = service.clone(); + tokio::task::spawn(async move { + let builder = conn::auto::Builder::new(TokioExecutor::new()); + let res = builder.serve_connection(io, service).await; + if let Err(err) = res { + println!("Error serving connection: {:?}", err); + } + }); } } diff --git a/compute_tools/src/http/body.rs b/compute_tools/src/http/body.rs new file mode 100644 index 0000000000..5744e26c7b --- /dev/null +++ b/compute_tools/src/http/body.rs @@ -0,0 +1,124 @@ +// https://docs.rs/axum-core/0.4.3/src/axum_core/body.rs.html + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use futures::{TryStream, TryStreamExt}; +use http_body::Frame; +use http_body_util::{BodyExt, StreamBody}; + +type BoxError = Box; +type BoxBody = http_body_util::combinators::BoxBody; + +fn boxed(body: B) -> BoxBody +where + B: http_body::Body + Sync + Send + 'static, + B::Error: Into, +{ + try_downcast(body).unwrap_or_else(|body| body.map_err(|x| x.into()).boxed()) +} + +pub(crate) fn try_downcast(k: K) -> Result +where + T: 'static, + K: Send + 'static, +{ + let mut k = Some(k); + if let Some(k) = ::downcast_mut::>(&mut k) { + Ok(k.take().unwrap()) + } else { + Err(k.unwrap()) + } +} + +/// The body type used in axum requests and responses. +#[derive(Debug)] +pub struct Body(BoxBody); + +impl Body { + /// Create a new `Body` that wraps another [`http_body::Body`]. + pub fn new(body: B) -> Self + where + B: http_body::Body + Sync + Send + 'static, + B::Error: Into, + { + try_downcast(body).unwrap_or_else(|body| Self(boxed(body))) + } + + /// Create an empty body. + pub fn empty() -> Self { + Self::new(http_body_util::Empty::new()) + } + + /// Create a new `Body` from a [`Stream`]. + /// + /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html + pub fn from_stream(stream: S) -> Self + where + S: TryStream + Sync + Send + 'static, + S::Ok: Into, + S::Error: Into, + { + Self::new(StreamBody::new( + stream.map_ok(|chunk| Frame::data(chunk.into())), + )) + } +} + +impl Default for Body { + fn default() -> Self { + Self::empty() + } +} + +impl From<()> for Body { + fn from(_: ()) -> Self { + Self::empty() + } +} + +macro_rules! body_from_impl { + ($ty:ty) => { + impl From<$ty> for Body { + fn from(buf: $ty) -> Self { + Self::new(http_body_util::Full::from(buf)) + } + } + }; +} + +body_from_impl!(&'static [u8]); +body_from_impl!(std::borrow::Cow<'static, [u8]>); +body_from_impl!(Vec); + +body_from_impl!(&'static str); +body_from_impl!(std::borrow::Cow<'static, str>); +body_from_impl!(String); + +body_from_impl!(Bytes); + +impl http_body::Body for Body { + type Data = Bytes; + type Error = BoxError; + + #[inline] + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.0).poll_frame(cx) + } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + self.0.size_hint() + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } +} diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index e5fdf85eed..cf2b69a844 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -1 +1,2 @@ pub mod api; +pub mod body; diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 2cc59a947b..5f9ee5e701 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -15,7 +15,11 @@ aws-sdk-s3.workspace = true aws-credential-types.workspace = true bytes.workspace = true camino.workspace = true -hyper = { workspace = true, features = ["stream"] } +hyper = { workspace = true, features = [] } +hyper-util = { workspace = true, features = [] } +http = { workspace = true, features = [] } +http-body = { workspace = true, features = [] } +http-body-util = { workspace = true, features = [] } futures.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index d7b41edaaf..ae13cb67aa 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -36,7 +36,9 @@ use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; use bytes::Bytes; use futures::stream::Stream; -use hyper::Body; +use futures_util::TryStreamExt; +use http_body::Frame; +use http_body_util::StreamBody; use scopeguard::ScopeGuard; use super::StorageMetadata; @@ -469,8 +471,8 @@ impl RemoteStorage for S3Bucket { let started_at = start_measuring_requests(kind); - let body = Body::wrap_stream(from); - let bytes_stream = ByteStream::new(SdkBody::from_body_0_4(body)); + let body = StreamBody::new(from.map_ok(Frame::data)); + let bytes_stream = ByteStream::new(SdkBody::from_body_1_x(body)); let res = self .client diff --git a/libs/tracing-utils/src/http.rs b/libs/tracing-utils/src/http.rs index f5ab267ff3..b386f36a72 100644 --- a/libs/tracing-utils/src/http.rs +++ b/libs/tracing-utils/src/http.rs @@ -1,7 +1,8 @@ //! Tracing wrapper for Hyper HTTP server +use hyper::body::Body; use hyper::HeaderMap; -use hyper::{Body, Request, Response}; +use hyper::{Request, Response}; use std::future::Future; use tracing::Instrument; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -35,14 +36,14 @@ pub enum OtelName<'a> { /// instrumentation libraries at: /// /// If a Hyper crate appears, consider switching to that. -pub async fn tracing_handler( - req: Request, +pub async fn tracing_handler( + req: Request, handler: F, otel_name: OtelName<'_>, -) -> Response +) -> Response where - F: Fn(Request) -> R, - R: Future>, + F: Fn(Request) -> R, + R: Future>, { // Create a tracing span, with context propagated from the incoming // request if any. diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 706b7a3187..0388fd34c0 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -22,6 +22,7 @@ chrono.workspace = true heapless.workspace = true hex = { workspace = true, features = ["serde"] } hyper = { workspace = true, features = ["full"] } +http-body-util = { workspace = true, features = [] } fail.workspace = true futures = { workspace = true} jsonwebtoken.workspace = true diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index 8704b72921..f83b4ccfe8 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -4,7 +4,8 @@ use crate::http::{ error::ApiError, json::{json_request, json_response}, }; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; +use routerify::Body; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::*; diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 550ab10700..169cb47efe 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -4,11 +4,11 @@ use anyhow::Context; use hyper::header::{HeaderName, AUTHORIZATION}; use hyper::http::HeaderValue; use hyper::Method; -use hyper::{header::CONTENT_TYPE, Body, Request, Response}; +use hyper::{header::CONTENT_TYPE, Request, Response}; use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use once_cell::sync::Lazy; use routerify::ext::RequestExt; -use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; +use routerify::{Body, Middleware, RequestInfo, Router, RouterBuilder}; use tracing::{self, debug, info, info_span, warn, Instrument}; use std::future::Future; @@ -238,7 +238,7 @@ async fn prometheus_metrics_handler(_req: Request) -> Result) -> Result( +pub fn add_request_id_middleware( ) -> Middleware { Middleware::pre(move |req| async move { let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) { @@ -317,7 +317,7 @@ async fn add_request_id_header_to_response( Ok(res) } -pub fn make_router() -> RouterBuilder { +pub fn make_router() -> RouterBuilder { Router::builder() .middleware(add_request_id_middleware()) .middleware(Middleware::post_with_info( @@ -328,11 +328,11 @@ pub fn make_router() -> RouterBuilder { } pub fn attach_openapi_ui( - router_builder: RouterBuilder, + router_builder: RouterBuilder, spec: &'static [u8], spec_mount_path: &'static str, ui_mount_path: &'static str, -) -> RouterBuilder { +) -> RouterBuilder { router_builder .get(spec_mount_path, move |r| request_span(r, move |_| async move { @@ -388,7 +388,7 @@ fn parse_token(header_value: &str) -> Result<&str, ApiError> { Ok(token) } -pub fn auth_middleware( +pub fn auth_middleware( provide_auth: fn(&Request) -> Option<&SwappableJwtAuth>, ) -> Middleware { Middleware::pre(move |req| async move { @@ -423,7 +423,7 @@ pub fn add_response_header_middleware( value: &str, ) -> anyhow::Result> where - B: hyper::body::HttpBody + Send + Sync + 'static, + B: hyper::body::Body + Send + Sync + 'static, { let name = HeaderName::from_str(header).with_context(|| format!("invalid header name: {header}"))?; @@ -464,7 +464,6 @@ pub fn check_permission_with( #[cfg(test)] mod tests { use super::*; - use futures::future::poll_fn; use hyper::service::Service; use routerify::RequestServiceBuilder; use std::net::{IpAddr, SocketAddr}; @@ -473,16 +472,13 @@ mod tests { async fn test_request_id_returned() { let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap(); let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80); - let mut service = builder.build(remote_addr); - if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await { - panic!("request service is not ready: {:?}", e); - } + let service = builder.build(remote_addr); let mut req: Request = Request::default(); req.headers_mut() .append(&X_REQUEST_ID_HEADER, HeaderValue::from_str("42").unwrap()); - let resp: Response = service.call(req).await.unwrap(); + let resp: Response = service.call(req).await.unwrap(); let header_val = resp.headers().get(&X_REQUEST_ID_HEADER).unwrap(); @@ -493,13 +489,10 @@ mod tests { async fn test_request_id_empty() { let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap(); let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80); - let mut service = builder.build(remote_addr); - if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await { - panic!("request service is not ready: {:?}", e); - } + let service = builder.build(remote_addr); let req: Request = Request::default(); - let resp: Response = service.call(req).await.unwrap(); + let resp: Response = service.call(req).await.unwrap(); let header_val = resp.headers().get(&X_REQUEST_ID_HEADER); diff --git a/libs/utils/src/http/error.rs b/libs/utils/src/http/error.rs index 3e9281ac81..a2816418d9 100644 --- a/libs/utils/src/http/error.rs +++ b/libs/utils/src/http/error.rs @@ -1,4 +1,5 @@ -use hyper::{header, Body, Response, StatusCode}; +use hyper::{header, Response, StatusCode}; +use routerify::Body; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::error::Error as StdError; diff --git a/libs/utils/src/http/json.rs b/libs/utils/src/http/json.rs index 7ca62561fe..4afd6a9260 100644 --- a/libs/utils/src/http/json.rs +++ b/libs/utils/src/http/json.rs @@ -1,6 +1,8 @@ -use anyhow::Context; +use anyhow::{anyhow, Context}; use bytes::Buf; -use hyper::{header, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{header, Request, Response, StatusCode}; +use routerify::Body; use serde::{Deserialize, Serialize}; use super::error::ApiError; @@ -18,10 +20,14 @@ pub async fn json_request Deserialize<'de>>( pub async fn json_request_or_empty_body Deserialize<'de>>( request: &mut Request, ) -> Result, ApiError> { - let body = hyper::body::aggregate(request.body_mut()) + let body = request + .body_mut() + .collect() .await + .map_err(|e| anyhow!(e)) .context("Failed to read request body") - .map_err(ApiError::BadRequest)?; + .map_err(ApiError::BadRequest)? + .aggregate(); if body.remaining() == 0 { return Ok(None); } diff --git a/libs/utils/src/http/mod.rs b/libs/utils/src/http/mod.rs index 74ed6bb5b2..15b73009e9 100644 --- a/libs/utils/src/http/mod.rs +++ b/libs/utils/src/http/mod.rs @@ -5,4 +5,4 @@ pub mod request; /// Current fast way to apply simple http routing in various Neon binaries. /// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed. -pub use routerify::{ext::RequestExt, RouterBuilder, RouterService}; +pub use routerify::{ext::RequestExt, RouterBuilder}; diff --git a/libs/utils/src/http/request.rs b/libs/utils/src/http/request.rs index 766bbfc9df..42c35fef3b 100644 --- a/libs/utils/src/http/request.rs +++ b/libs/utils/src/http/request.rs @@ -3,8 +3,9 @@ use std::{borrow::Cow, str::FromStr}; use super::error::ApiError; use anyhow::anyhow; -use hyper::{body::HttpBody, Body, Request}; -use routerify::ext::RequestExt; +use http_body_util::BodyExt; +use hyper::Request; +use routerify::{ext::RequestExt, Body}; pub fn get_request_param<'a>( request: &'a Request, @@ -75,7 +76,7 @@ pub fn parse_query_param>( } pub async fn ensure_no_body(request: &mut Request) -> Result<(), ApiError> { - match request.body_mut().data().await { + match request.body_mut().frame().await { Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))), None => Ok(()), } diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index ac4b00669e..f0dc688c9c 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -19,6 +19,7 @@ futures-util.workspace = true git-version.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["full"] } +http-body-util = { workspace = true } once_cell.workspace = true parking_lot.workspace = true prost.workspace = true diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index aa5d0bad5f..8d41fd0936 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -1,9 +1,6 @@ -use hyper::body::HttpBody; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; use tonic::codegen::StdError; -use tonic::transport::{ClientTlsConfig, Endpoint}; +use tonic::transport::{ClientTlsConfig, Endpoint, Uri}; use tonic::{transport::Channel, Status}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; @@ -27,8 +24,6 @@ pub use tonic::Code; pub use tonic::Request; pub use tonic::Streaming; -pub use hyper::Uri; - pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}"); @@ -99,50 +94,7 @@ pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result; -// Provides impl HttpBody for two different types implementing it. Inspired by -// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs -pub enum EitherBody { - Left(A), - Right(B), -} - -impl HttpBody for EitherBody -where - A: HttpBody + Send + Unpin, - B: HttpBody + Send + Unpin, - A::Error: Into, - B::Error: Into, -{ - type Data = A::Data; - type Error = Box; - - fn is_end_stream(&self) -> bool { - match self { - EitherBody::Left(b) => b.is_end_stream(), - EitherBody::Right(b) => b.is_end_stream(), - } - } - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - match self.get_mut() { - EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err), - EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - match self.get_mut() { - EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), - EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), - } - } -} +pub type EitherBody = http_body_util::Either; fn map_option_err>(err: Option>) -> Option> { err.map(|e| e.map_err(Into::into))