storage_broker: update hyper and tonic again (#9299)

Update hyper and tonic again in the storage broker, this time with a fix
for the issue that made us revert the update last time.

The first commit is a revert of #9268, the second a fix for the issue.

fixes #9231.
This commit is contained in:
Arpad Müller
2024-10-07 21:12:13 +02:00
committed by GitHub
parent 6eba29c732
commit 912d47ec02
8 changed files with 196 additions and 303 deletions

264
Cargo.lock generated
View File

@@ -666,34 +666,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core 0.3.4",
"bitflags 1.3.2",
"bytes",
"futures-util",
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.30",
"itoa",
"matchit 0.7.0",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 0.1.2",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.7.5"
@@ -701,7 +673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
dependencies = [
"async-trait",
"axum-core 0.4.5",
"axum-core",
"base64 0.21.1",
"bytes",
"futures-util",
@@ -731,23 +703,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 0.2.9",
"http-body 0.4.5",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.5"
@@ -971,7 +926,7 @@ dependencies = [
"clang-sys",
"itertools 0.12.1",
"log",
"prettyplease 0.2.17",
"prettyplease",
"proc-macro2",
"quote",
"regex",
@@ -2454,15 +2409,6 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "hostname"
version = "0.4.0"
@@ -2657,14 +2603,15 @@ dependencies = [
[[package]]
name = "hyper-timeout"
version = "0.4.1"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
"tower-service",
]
[[package]]
@@ -3470,7 +3417,7 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost 0.13.3",
"prost",
"reqwest 0.12.4",
"thiserror",
]
@@ -3483,8 +3430,8 @@ checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost 0.13.3",
"tonic 0.12.3",
"prost",
"tonic",
]
[[package]]
@@ -4178,16 +4125,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "prettyplease"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86"
dependencies = [
"proc-macro2",
"syn 1.0.109",
]
[[package]]
name = "prettyplease"
version = "0.2.17"
@@ -4258,16 +4195,6 @@ dependencies = [
"thiserror",
]
[[package]]
name = "prost"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive 0.11.9",
]
[[package]]
name = "prost"
version = "0.13.3"
@@ -4275,42 +4202,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f"
dependencies = [
"bytes",
"prost-derive 0.13.3",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.11.9"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.4.1",
"itertools 0.10.5",
"lazy_static",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease 0.1.25",
"prost 0.11.9",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn 1.0.109",
"syn 2.0.52",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
@@ -4328,11 +4241,11 @@ dependencies = [
[[package]]
name = "prost-types"
version = "0.11.9"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
dependencies = [
"prost 0.11.9",
"prost",
]
[[package]]
@@ -5094,6 +5007,21 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rustls"
version = "0.23.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebbbdb961df0ad3f2652da8f3fdc4b36122f568f968f45ad3316f26c025c677b"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.2",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.2"
@@ -5119,6 +5047,19 @@ dependencies = [
"security-framework",
]
[[package]]
name = "rustls-native-certs"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.1.1",
"rustls-pki-types",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.2"
@@ -5194,6 +5135,7 @@ dependencies = [
"fail",
"futures",
"hex",
"http 1.1.0",
"humantime",
"hyper 0.14.30",
"metrics",
@@ -5750,19 +5692,22 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"bytes",
"clap",
"const_format",
"futures",
"futures-core",
"futures-util",
"http-body-util",
"humantime",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost 0.11.9",
"prost",
"tokio",
"tonic 0.9.2",
"tonic",
"tonic-build",
"tracing",
"utils",
@@ -6306,6 +6251,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.23.7",
"rustls-pki-types",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.16"
@@ -6397,29 +6353,30 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.9.2"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum 0.6.20",
"base64 0.21.1",
"axum",
"base64 0.22.1",
"bytes",
"futures-core",
"futures-util",
"h2 0.3.26",
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.30",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.11.9",
"rustls-native-certs 0.6.2",
"rustls-pemfile 1.0.2",
"prost",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"socket2",
"tokio",
"tokio-rustls 0.24.0",
"tokio-rustls 0.26.0",
"tokio-stream",
"tower",
"tower-layer",
@@ -6428,37 +6385,17 @@ dependencies = [
]
[[package]]
name = "tonic"
name = "tonic-build"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"percent-encoding",
"pin-project",
"prost 0.13.3",
"tokio-stream",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07"
dependencies = [
"prettyplease 0.1.25",
"prettyplease",
"proc-macro2",
"prost-build",
"prost-types",
"quote",
"syn 1.0.109",
"syn 2.0.52",
]
[[package]]
@@ -6864,7 +6801,7 @@ name = "vm_monitor"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.5",
"axum",
"cgroups-rs",
"clap",
"futures",
@@ -7095,18 +7032,6 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix",
]
[[package]]
name = "whoami"
version = "1.5.1"
@@ -7335,9 +7260,10 @@ version = "0.1.0"
dependencies = [
"ahash",
"anyhow",
"axum",
"axum-core",
"base64 0.21.1",
"base64ct",
"bitflags 2.4.1",
"bytes",
"camino",
"cc",
@@ -7365,7 +7291,6 @@ dependencies = [
"hyper 1.4.1",
"hyper-util",
"indexmap 1.9.3",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"libc",
@@ -7377,15 +7302,15 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"prettyplease",
"proc-macro2",
"prost 0.11.9",
"prost",
"quote",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"reqwest 0.12.4",
"rustls 0.21.11",
"scopeguard",
"serde",
"serde_json",
@@ -7401,9 +7326,10 @@ dependencies = [
"time",
"time-macros",
"tokio",
"tokio-rustls 0.24.0",
"tokio-stream",
"tokio-util",
"toml_edit",
"tonic",
"tower",
"tracing",
"tracing-core",

View File

@@ -130,7 +130,7 @@ pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
prost = "0.13"
rand = "0.8"
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
@@ -178,7 +178,7 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tonic = {version = "0.12.3", features = ["tls", "tls-roots"]}
tower-service = "0.3.2"
tracing = "0.1"
tracing-error = "0.2"
@@ -246,7 +246,7 @@ criterion = "0.5.1"
rcgen = "0.12"
rstest = "0.18"
camino-tempfile = "1.0.2"
tonic-build = "0.9"
tonic-build = "0.12"
[patch.crates-io]

View File

@@ -23,6 +23,7 @@ crc32c.workspace = true
fail.workspace = true
hex.workspace = true
humantime.workspace = true
http.workspace = true
hyper0.workspace = true
futures.workspace = true
once_cell.workspace = true

View File

@@ -13,7 +13,7 @@ use desim::{
node_os::NodeOs,
proto::{AnyMessage, NetEvent, NodeEvent},
};
use hyper0::Uri;
use http::Uri;
use safekeeper::{
safekeeper::{ProposerAcceptorMessage, SafeKeeper, ServerInfo, UNKNOWN_SERVER_VERSION},
state::{TimelinePersistentState, TimelineState},

View File

@@ -10,13 +10,16 @@ bench = []
[dependencies]
anyhow.workspace = true
async-stream.workspace = true
bytes.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
futures.workspace = true
futures-core.workspace = true
futures-util.workspace = true
humantime.workspace = true
hyper0 = { workspace = true, features = ["full"] }
hyper = { workspace = true, features = ["full"] }
http-body-util.workspace = true
hyper-util = "0.1"
once_cell.workspace = true
parking_lot.workspace = true
prost.workspace = true

View File

@@ -10,16 +10,15 @@
//!
//! Only safekeeper message is supported, but it is not hard to add something
//! else with generics.
extern crate hyper0 as hyper;
use clap::{command, Parser};
use futures_core::Stream;
use futures_util::StreamExt;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::header::CONTENT_TYPE;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, StatusCode};
use hyper::service::service_fn;
use hyper::{Method, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::convert::Infallible;
@@ -27,9 +26,11 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time;
use tonic::body::{self, empty_body, BoxBody};
use tonic::codegen::Service;
use tonic::transport::server::Connected;
use tonic::Code;
@@ -48,9 +49,7 @@ use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
};
use storage_broker::{
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
};
use storage_broker::{parse_proto_ttid, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR};
use utils::id::TenantTimelineId;
use utils::logging::{self, LogFormat};
use utils::sentry_init::init_sentry;
@@ -602,8 +601,8 @@ impl BrokerService for Broker {
// We serve only metrics and healthcheck through http1.
async fn http1_handler(
req: hyper::Request<hyper::body::Body>,
) -> Result<hyper::Response<Body>, Infallible> {
req: hyper::Request<Incoming>,
) -> Result<hyper::Response<BoxBody>, Infallible> {
let resp = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
@@ -614,16 +613,16 @@ async fn http1_handler(
hyper::Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.body(body::boxed(Full::new(bytes::Bytes::from(buffer))))
.unwrap()
}
(&Method::GET, "/status") => hyper::Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.body(empty_body())
.unwrap(),
_ => hyper::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.body(empty_body())
.unwrap(),
};
Ok(resp)
@@ -665,52 +664,76 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
info!("listening on {}", &args.listen_addr);
// grpc is served along with http1 for metrics on a single port, hence we
// don't use tonic's Server.
hyper::Server::bind(&args.listen_addr)
.http2_keep_alive_interval(Some(args.http2_keepalive_interval))
.serve(make_service_fn(move |conn: &AddrStream| {
let storage_broker_server_cloned = storage_broker_server.clone();
let connect_info = conn.connect_info();
async move {
Ok::<_, Infallible>(service_fn(move |mut req| {
// That's what tonic's MakeSvc.call does to pass conninfo to
// the request handler (and where its request.remote_addr()
// expects it to find).
req.extensions_mut().insert(connect_info.clone());
// Technically this second clone is not needed, but consume
// by async block is apparently unavoidable. BTW, error
// message is enigmatic, see
// https://github.com/rust-lang/rust/issues/68119
//
// We could get away without async block at all, but then we
// need to resort to futures::Either to merge the result,
// which doesn't caress an eye as well.
let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
async move {
if req.headers().get("content-type").map(|x| x.as_bytes())
== Some(b"application/grpc")
{
let res_resp = storage_broker_server_svc.call(req).await;
// Grpc and http1 handlers have slightly different
// Response types: it is UnsyncBoxBody for the
// former one (not sure why) and plain hyper::Body
// for the latter. Both implement HttpBody though,
// and EitherBody is used to merge them.
res_resp.map(|resp| resp.map(EitherBody::Left))
} else {
let res_resp = http1_handler(req).await;
res_resp.map(|resp| resp.map(EitherBody::Right))
}
}
}))
let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
info!("listening on {}", &args.listen_addr);
loop {
let (stream, addr) = match tcp_listener.accept().await {
Ok(v) => v,
Err(e) => {
info!("couldn't accept connection: {e}");
continue;
}
}))
.await?;
Ok(())
};
let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
builder.http1().timer(TokioTimer::new());
builder
.http2()
.timer(TokioTimer::new())
.keep_alive_interval(Some(args.http2_keepalive_interval))
// This matches the tonic server default. It allows us to support production-like workloads.
.max_concurrent_streams(None);
let storage_broker_server_cloned = storage_broker_server.clone();
let connect_info = stream.connect_info();
let service_fn_ = async move {
service_fn(move |mut req| {
// That's what tonic's MakeSvc.call does to pass conninfo to
// the request handler (and where its request.remote_addr()
// expects it to find).
req.extensions_mut().insert(connect_info.clone());
// Technically this second clone is not needed, but consume
// by async block is apparently unavoidable. BTW, error
// message is enigmatic, see
// https://github.com/rust-lang/rust/issues/68119
//
// We could get away without async block at all, but then we
// need to resort to futures::Either to merge the result,
// which doesn't caress an eye as well.
let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
async move {
if req.headers().get("content-type").map(|x| x.as_bytes())
== Some(b"application/grpc")
{
let res_resp = storage_broker_server_svc.call(req).await;
// Grpc and http1 handlers have slightly different
// Response types: it is UnsyncBoxBody for the
// former one (not sure why) and plain hyper::Body
// for the latter. Both implement HttpBody though,
// and `Either` is used to merge them.
res_resp.map(|resp| resp.map(http_body_util::Either::Left))
} else {
let res_resp = http1_handler(req).await;
res_resp.map(|resp| resp.map(http_body_util::Either::Right))
}
}
})
}
.await;
tokio::task::spawn(async move {
let res = builder
.serve_connection(TokioIo::new(stream), service_fn_)
.await;
if let Err(e) = res {
info!("error serving connection from {addr}: {e}");
}
});
}
}
#[cfg(test)]

View File

@@ -1,8 +1,3 @@
extern crate hyper0 as hyper;
use hyper::body::HttpBody;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tonic::codegen::StdError;
use tonic::transport::{ClientTlsConfig, Endpoint};
@@ -96,56 +91,3 @@ pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTime
timeline_id,
})
}
// These several usages don't justify anyhow dependency, though it would work as
// well.
type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
// Provides impl HttpBody for two different types implementing it. Inspired by
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
pub enum EitherBody<A, B> {
Left(A),
Right(B),
}
impl<A, B> HttpBody for EitherBody<A, B>
where
A: HttpBody + Send + Unpin,
B: HttpBody<Data = A::Data> + Send + Unpin,
A::Error: Into<AnyError>,
B::Error: Into<AnyError>,
{
type Data = A::Data;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}
}
fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
err.map(|e| e.map_err(Into::into))
}

View File

@@ -17,9 +17,10 @@ license.workspace = true
[dependencies]
ahash = { version = "0.8" }
anyhow = { version = "1", features = ["backtrace"] }
axum = { version = "0.7", features = ["ws"] }
axum-core = { version = "0.4", default-features = false, features = ["tracing"] }
base64 = { version = "0.21", features = ["alloc"] }
base64ct = { version = "1", default-features = false, features = ["std"] }
bitflags = { version = "2", default-features = false, features = ["std"] }
bytes = { version = "1", features = ["serde"] }
camino = { version = "1", default-features = false, features = ["serde1"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
@@ -43,11 +44,10 @@ hashbrown = { version = "0.14", features = ["raw"] }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["full"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["http1", "http2", "server"] }
hyper-util = { version = "0.1", features = ["http1", "http2", "server", "tokio"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" }
itertools = { version = "0.12" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -58,13 +58,12 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prost = { version = "0.11" }
prost = { version = "0.13", features = ["prost-derive"] }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
@@ -77,10 +76,11 @@ sync_wrapper = { version = "0.1", default-features = false, features = ["futures
tikv-jemalloc-sys = { version = "0.5" }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
tonic = { version = "0.12", features = ["tls-roots"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
url = { version = "2", features = ["serde"] }
@@ -92,7 +92,6 @@ zstd-sys = { version = "2", default-features = false, features = ["legacy", "std
[build-dependencies]
ahash = { version = "0.8" }
anyhow = { version = "1", features = ["backtrace"] }
bitflags = { version = "2", default-features = false, features = ["std"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
@@ -101,9 +100,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
itertools = { version = "0.12" }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }
@@ -113,8 +110,9 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] }
proc-macro2 = { version = "1" }
prost = { version = "0.11" }
prost = { version = "0.13", features = ["prost-derive"] }
quote = { version = "1" }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }