mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Use hyper 1.0 and tonic 0.12 in storage broker (#9234)
Fixes #9231 . Upgrade hyper to 1.4.0 and use hyper 1.4 instead of 0.14 in the storage broker, together with tonic 0.12. The two upgrades go hand in hand. Thanks to the broker being independent from other components, we can upgrade its hyper version without touching the other components, which makes things easier.
This commit is contained in:
@@ -10,13 +10,16 @@ bench = []
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-stream.workspace = true
|
||||
bytes.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
const_format.workspace = true
|
||||
futures.workspace = true
|
||||
futures-core.workspace = true
|
||||
futures-util.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
hyper_1 = { workspace = true, features = ["full"] }
|
||||
http-body-util.workspace = true
|
||||
hyper-util = "0.1"
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
prost.workspace = true
|
||||
|
||||
@@ -13,10 +13,13 @@
|
||||
use clap::{command, Parser};
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use http_body_util::Full;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::server::conn::AddrStream;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, StatusCode};
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Method, StatusCode};
|
||||
use hyper_1 as hyper;
|
||||
use hyper_1::body::Incoming;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
@@ -24,9 +27,11 @@ use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::time;
|
||||
use tonic::body::{self, empty_body, BoxBody};
|
||||
use tonic::codegen::Service;
|
||||
use tonic::transport::server::Connected;
|
||||
use tonic::Code;
|
||||
@@ -45,9 +50,7 @@ use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
|
||||
};
|
||||
use storage_broker::{
|
||||
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
|
||||
};
|
||||
use storage_broker::{parse_proto_ttid, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR};
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::logging::{self, LogFormat};
|
||||
use utils::sentry_init::init_sentry;
|
||||
@@ -599,8 +602,8 @@ impl BrokerService for Broker {
|
||||
|
||||
// We serve only metrics and healthcheck through http1.
|
||||
async fn http1_handler(
|
||||
req: hyper::Request<hyper::body::Body>,
|
||||
) -> Result<hyper::Response<Body>, Infallible> {
|
||||
req: hyper::Request<Incoming>,
|
||||
) -> Result<hyper::Response<BoxBody>, Infallible> {
|
||||
let resp = match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, "/metrics") => {
|
||||
let mut buffer = vec![];
|
||||
@@ -611,16 +614,16 @@ async fn http1_handler(
|
||||
hyper::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
.body(body::boxed(Full::new(bytes::Bytes::from(buffer))))
|
||||
.unwrap()
|
||||
}
|
||||
(&Method::GET, "/status") => hyper::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::empty())
|
||||
.body(empty_body())
|
||||
.unwrap(),
|
||||
_ => hyper::Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::empty())
|
||||
.body(empty_body())
|
||||
.unwrap(),
|
||||
};
|
||||
Ok(resp)
|
||||
@@ -662,52 +665,74 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
};
|
||||
let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
|
||||
|
||||
info!("listening on {}", &args.listen_addr);
|
||||
|
||||
// grpc is served along with http1 for metrics on a single port, hence we
|
||||
// don't use tonic's Server.
|
||||
hyper::Server::bind(&args.listen_addr)
|
||||
.http2_keep_alive_interval(Some(args.http2_keepalive_interval))
|
||||
.serve(make_service_fn(move |conn: &AddrStream| {
|
||||
let storage_broker_server_cloned = storage_broker_server.clone();
|
||||
let connect_info = conn.connect_info();
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |mut req| {
|
||||
// That's what tonic's MakeSvc.call does to pass conninfo to
|
||||
// the request handler (and where its request.remote_addr()
|
||||
// expects it to find).
|
||||
req.extensions_mut().insert(connect_info.clone());
|
||||
|
||||
// Technically this second clone is not needed, but consume
|
||||
// by async block is apparently unavoidable. BTW, error
|
||||
// message is enigmatic, see
|
||||
// https://github.com/rust-lang/rust/issues/68119
|
||||
//
|
||||
// We could get away without async block at all, but then we
|
||||
// need to resort to futures::Either to merge the result,
|
||||
// which doesn't caress an eye as well.
|
||||
let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
|
||||
async move {
|
||||
if req.headers().get("content-type").map(|x| x.as_bytes())
|
||||
== Some(b"application/grpc")
|
||||
{
|
||||
let res_resp = storage_broker_server_svc.call(req).await;
|
||||
// Grpc and http1 handlers have slightly different
|
||||
// Response types: it is UnsyncBoxBody for the
|
||||
// former one (not sure why) and plain hyper::Body
|
||||
// for the latter. Both implement HttpBody though,
|
||||
// and EitherBody is used to merge them.
|
||||
res_resp.map(|resp| resp.map(EitherBody::Left))
|
||||
} else {
|
||||
let res_resp = http1_handler(req).await;
|
||||
res_resp.map(|resp| resp.map(EitherBody::Right))
|
||||
}
|
||||
}
|
||||
}))
|
||||
let tcp_listener = TcpListener::bind(&args.listen_addr).await?;
|
||||
info!("listening on {}", &args.listen_addr);
|
||||
loop {
|
||||
let (stream, addr) = match tcp_listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
info!("couldn't accept connection: {e}");
|
||||
continue;
|
||||
}
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
|
||||
builder.http1().timer(TokioTimer::new());
|
||||
builder
|
||||
.http2()
|
||||
.timer(TokioTimer::new())
|
||||
.keep_alive_interval(Some(args.http2_keepalive_interval));
|
||||
|
||||
let storage_broker_server_cloned = storage_broker_server.clone();
|
||||
let connect_info = stream.connect_info();
|
||||
let service_fn_ = async move {
|
||||
service_fn(move |mut req| {
|
||||
// That's what tonic's MakeSvc.call does to pass conninfo to
|
||||
// the request handler (and where its request.remote_addr()
|
||||
// expects it to find).
|
||||
req.extensions_mut().insert(connect_info.clone());
|
||||
|
||||
// Technically this second clone is not needed, but consume
|
||||
// by async block is apparently unavoidable. BTW, error
|
||||
// message is enigmatic, see
|
||||
// https://github.com/rust-lang/rust/issues/68119
|
||||
//
|
||||
// We could get away without async block at all, but then we
|
||||
// need to resort to futures::Either to merge the result,
|
||||
// which doesn't caress an eye as well.
|
||||
let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
|
||||
async move {
|
||||
if req.headers().get("content-type").map(|x| x.as_bytes())
|
||||
== Some(b"application/grpc")
|
||||
{
|
||||
let res_resp = storage_broker_server_svc.call(req).await;
|
||||
// Grpc and http1 handlers have slightly different
|
||||
// Response types: it is UnsyncBoxBody for the
|
||||
// former one (not sure why) and plain hyper::Body
|
||||
// for the latter. Both implement HttpBody though,
|
||||
// and `Either` is used to merge them.
|
||||
res_resp.map(|resp| resp.map(http_body_util::Either::Left))
|
||||
} else {
|
||||
let res_resp = http1_handler(req).await;
|
||||
res_resp.map(|resp| resp.map(http_body_util::Either::Right))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
.await;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let res = builder
|
||||
.serve_connection(TokioIo::new(stream), service_fn_)
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
info!("error serving connection from {addr}: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use hyper::body::HttpBody;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use hyper_1 as hyper;
|
||||
use std::time::Duration;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{ClientTlsConfig, Endpoint};
|
||||
@@ -94,56 +92,3 @@ pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTime
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
|
||||
// These several usages don't justify anyhow dependency, though it would work as
|
||||
// well.
|
||||
type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
// Provides impl HttpBody for two different types implementing it. Inspired by
|
||||
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
|
||||
pub enum EitherBody<A, B> {
|
||||
Left(A),
|
||||
Right(B),
|
||||
}
|
||||
|
||||
impl<A, B> HttpBody for EitherBody<A, B>
|
||||
where
|
||||
A: HttpBody + Send + Unpin,
|
||||
B: HttpBody<Data = A::Data> + Send + Unpin,
|
||||
A::Error: Into<AnyError>,
|
||||
B::Error: Into<AnyError>,
|
||||
{
|
||||
type Data = A::Data;
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
fn is_end_stream(&self) -> bool {
|
||||
match self {
|
||||
EitherBody::Left(b) => b.is_end_stream(),
|
||||
EitherBody::Right(b) => b.is_end_stream(),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_data(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
|
||||
match self.get_mut() {
|
||||
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
|
||||
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_trailers(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
|
||||
match self.get_mut() {
|
||||
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
|
||||
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
|
||||
err.map(|e| e.map_err(Into::into))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user