proxy: Unalias hyper1 and replace one use of hyper0 in test (#9324)

Leaves one final use of hyper0 in proxy for the health service,
which requires some coordinated effort with other services.
This commit is contained in:
Folke Behrens
2024-10-09 12:44:17 +02:00
committed by GitHub
parent 8a138db8b7
commit 54d1185789
13 changed files with 74 additions and 74 deletions

View File

@@ -39,7 +39,7 @@ http.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
hyper0.workspace = true
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
hyper = { workspace = true, features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
indexmap.workspace = true

View File

@@ -571,7 +571,7 @@ mod tests {
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
use hyper1::service::service_fn;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;
use rsa::pkcs8::DecodePrivateKey;
@@ -736,7 +736,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
});
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let server = hyper1::server::conn::http1::Builder::new();
let server = hyper::server::conn::http1::Builder::new();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {

View File

@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail};
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use hyper0::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use measured::{text::BufferedTextEncoder, MetricGroup};
use metrics::NeonMetrics;
use std::{
@@ -21,7 +21,7 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, "")
}
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper0::Body, ApiError> {
let state = Arc::new(Mutex::new(PrometheusHandler {
encoder: BufferedTextEncoder::new(),
metrics,
@@ -45,7 +45,7 @@ pub async fn task_main(
let service = || RouterService::new(make_router(metrics).build()?);
hyper::Server::from_tcp(http_listener)?
hyper0::Server::from_tcp(http_listener)?
.serve(service().map_err(|e| anyhow!(e))?)
.await?;

View File

@@ -9,7 +9,7 @@ use std::time::Duration;
use anyhow::bail;
use bytes::Bytes;
use http_body_util::BodyExt;
use hyper1::body::Body;
use hyper::body::Body;
use serde::de::DeserializeOwned;
pub(crate) use reqwest::{Request, Response};

View File

@@ -90,8 +90,6 @@ use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::warn;
extern crate hyper0 as hyper;
pub mod auth;
pub mod cache;
pub mod cancellation;

View File

@@ -7,7 +7,7 @@ use crate::metrics::{
WakeupFailureKind,
};
use crate::proxy::retry::{retry_after, should_retry};
use hyper1::StatusCode;
use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;

View File

@@ -257,7 +257,7 @@ pub(crate) enum LocalProxyConnError {
#[error("error with connection to local-proxy")]
Io(#[source] std::io::Error),
#[error("could not establish h2 connection")]
H2(#[from] hyper1::Error),
H2(#[from] hyper::Error),
}
impl ReportableError for HttpConnError {
@@ -481,7 +481,7 @@ async fn connect_http2(
};
};
let (client, connection) = hyper1::client::conn::http2::Builder::new(TokioExecutor::new())
let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.timer(TokioTimer::new())
.keep_alive_interval(Duration::from_secs(20))
.keep_alive_while_idle(true)

View File

@@ -1,5 +1,5 @@
use dashmap::DashMap;
use hyper1::client::conn::http2;
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use rand::Rng;
@@ -18,9 +18,9 @@ use tracing::{info, info_span, Instrument};
use super::conn_pool::ConnInfo;
pub(crate) type Send = http2::SendRequest<hyper1::body::Incoming>;
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect =
http2::Connection<TokioIo<TcpStream>, hyper1::body::Incoming, TokioExecutor>;
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
struct ConnPoolEntry {

View File

@@ -11,7 +11,7 @@ use serde::Serialize;
use utils::http::error::ApiError;
/// Like [`ApiError::into_response`]
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper1::Error>> {
pub(crate) fn api_error_into_response(this: ApiError) -> Response<BoxBody<Bytes, hyper::Error>> {
match this {
ApiError::BadRequest(err) => HttpErrorBody::response_from_msg_and_status(
format!("{err:#?}"), // use debug printing so that we give the cause
@@ -67,12 +67,12 @@ impl HttpErrorBody {
fn response_from_msg_and_status(
msg: String,
status: StatusCode,
) -> Response<BoxBody<Bytes, hyper1::Error>> {
) -> Response<BoxBody<Bytes, hyper::Error>> {
HttpErrorBody { msg }.to_response(status)
}
/// Same as [`utils::http::error::HttpErrorBody::to_response`]
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper1::Error>> {
fn to_response(&self, status: StatusCode) -> Response<BoxBody<Bytes, hyper::Error>> {
Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "application/json")
@@ -90,7 +90,7 @@ impl HttpErrorBody {
pub(crate) fn json_response<T: Serialize>(
status: StatusCode,
data: T,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let json = serde_json::to_string(&data)
.context("Failed to serialize JSON response")
.map_err(ApiError::InternalServerError)?;

View File

@@ -22,7 +22,7 @@ use futures::TryFutureExt;
use http::{Method, Response, StatusCode};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty};
use hyper1::body::Incoming;
use hyper::body::Incoming;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder;
use rand::rngs::StdRng;
@@ -302,7 +302,7 @@ async fn connection_handler(
let server = Builder::new(TokioExecutor::new());
let conn = server.serve_connection_with_upgrades(
hyper_util::rt::TokioIo::new(conn),
hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
// First HTTP request shares the same session ID
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
@@ -355,7 +355,7 @@ async fn connection_handler(
#[allow(clippy::too_many_arguments)]
async fn request_handler(
mut request: hyper1::Request<Incoming>,
mut request: hyper::Request<Incoming>,
config: &'static ProxyConfig,
backend: Arc<PoolingBackend>,
ws_connections: TaskTracker,
@@ -365,7 +365,7 @@ async fn request_handler(
// used to cancel in-flight HTTP requests. not used to cancel websockets
http_cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let host = request
.headers()
.get("host")

View File

@@ -12,14 +12,14 @@ use http::Method;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper1::body::Body;
use hyper1::body::Incoming;
use hyper1::header;
use hyper1::http::HeaderName;
use hyper1::http::HeaderValue;
use hyper1::Response;
use hyper1::StatusCode;
use hyper1::{HeaderMap, Request};
use hyper::body::Body;
use hyper::body::Incoming;
use hyper::header;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::Response;
use hyper::StatusCode;
use hyper::{HeaderMap, Request};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
@@ -272,7 +272,7 @@ pub(crate) async fn handle(
request: Request<Incoming>,
backend: Arc<PoolingBackend>,
cancel: CancellationToken,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, ApiError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let result = handle_inner(cancel, config, &ctx, request, backend).await;
let mut response = match result {
@@ -435,7 +435,7 @@ impl UserFacingError for SqlOverHttpError {
#[derive(Debug, thiserror::Error)]
pub(crate) enum ReadPayloadError {
#[error("could not read the HTTP request body: {0}")]
Read(#[from] hyper1::Error),
Read(#[from] hyper::Error),
#[error("could not parse the HTTP request body: {0}")]
Parse(#[from] serde_json::Error),
}
@@ -476,7 +476,7 @@ struct HttpHeaders {
}
impl HttpHeaders {
fn try_parse(headers: &hyper1::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
fn try_parse(headers: &hyper::http::HeaderMap) -> Result<Self, SqlOverHttpError> {
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
@@ -529,7 +529,7 @@ async fn handle_inner(
ctx: &RequestMonitoring,
request: Request<Incoming>,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
let _requeset_gauge = Metrics::get()
.proxy
.connection_requests
@@ -577,7 +577,7 @@ async fn handle_db_inner(
conn_info: ConnInfo,
auth: AuthData,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
//
// Determine the destination and connection params
//
@@ -744,7 +744,7 @@ async fn handle_auth_broker_inner(
conn_info: ConnInfo,
jwt: String,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper1::Error>>, SqlOverHttpError> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
backend
.authenticate_with_jwt(
ctx,

View File

@@ -12,7 +12,7 @@ use anyhow::Context as _;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use framed_websockets::{Frame, OpCode, WebSocketServer};
use futures::{Sink, Stream};
use hyper1::upgrade::OnUpgrade;
use hyper::upgrade::OnUpgrade;
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;

View File

@@ -485,49 +485,51 @@ async fn upload_events_chunk(
#[cfg(test)]
mod tests {
use std::{
net::TcpListener,
sync::{Arc, Mutex},
};
use super::*;
use crate::{http, BranchId, EndpointId};
use anyhow::Error;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
use hyper::{
service::{make_service_fn, service_fn},
Body, Response,
};
use http_body_util::BodyExt;
use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request, Response};
use hyper_util::rt::TokioIo;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use url::Url;
use super::*;
use crate::{http, BranchId, EndpointId};
#[tokio::test]
async fn metrics() {
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
type Report = EventChunk<'static, Event<Ids, String>>;
let reports: Arc<Mutex<Vec<Report>>> = Arc::default();
let reports = Arc::new(Mutex::new(vec![]));
let reports2 = reports.clone();
let server = hyper::server::Server::from_tcp(listener)
.unwrap()
.serve(make_service_fn(move |_| {
let reports = reports.clone();
async move {
Ok::<_, Error>(service_fn(move |req| {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn({
let reports = reports.clone();
async move {
loop {
if let Ok((stream, _addr)) = listener.accept().await {
let reports = reports.clone();
async move {
let bytes = hyper::body::to_bytes(req.into_body()).await?;
let events: EventChunk<'static, Event<Ids, String>> =
serde_json::from_slice(&bytes)?;
reports.lock().unwrap().push(events);
Ok::<_, Error>(Response::new(Body::from(vec![])))
}
}))
http1::Builder::new()
.serve_connection(
TokioIo::new(stream),
service_fn(move |req: Request<Incoming>| {
let reports = reports.clone();
async move {
let bytes = req.into_body().collect().await?.to_bytes();
let events = serde_json::from_slice(&bytes)?;
reports.lock().unwrap().push(events);
Ok::<_, Error>(Response::new(String::new()))
}
}),
)
.await
.unwrap();
}
}
}));
let addr = server.local_addr();
tokio::spawn(server);
}
});
let metrics = Metrics::default();
let client = http::new_client();
@@ -536,7 +538,7 @@ mod tests {
// no counters have been registered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// register a new counter
@@ -548,7 +550,7 @@ mod tests {
// the counter should be observed despite 0 egress
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 0);
@@ -558,7 +560,7 @@ mod tests {
// egress should be observered
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 1);
@@ -568,7 +570,7 @@ mod tests {
// we do not observe the counter
collect_metrics_iteration(&metrics.endpoints, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
let r = std::mem::take(&mut *reports.lock().unwrap());
assert!(r.is_empty());
// counter is unregistered