Proxy: improve http-pool (#6577)

## Problem

The password check logic for the sql-over-http is a bit non-intuitive. 

## Summary of changes

1. Perform scram auth using the same logic as for websocket cleartext
password.
2. Split establish connection logic and connection pool.
3. Parallelize param parsing logic with authentication + wake compute.
4. Limit the total number of clients
This commit is contained in:
Anna Khanova
2024-02-08 12:57:05 +01:00
committed by GitHub
parent c52495774d
commit c63e3e7e84
16 changed files with 753 additions and 478 deletions

View File

@@ -13,6 +13,7 @@ use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Value;
use tokio::join;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::GenericClient;
@@ -20,6 +21,7 @@ use tokio_postgres::IsolationLevel;
use tokio_postgres::ReadyForQueryStatus;
use tokio_postgres::Transaction;
use tracing::error;
use tracing::info;
use tracing::instrument;
use url::Url;
use utils::http::error::ApiError;
@@ -27,22 +29,25 @@ use utils::http::json::json_response;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::endpoint_sni;
use crate::config::HttpConfig;
use crate::config::ProxyConfig;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::RoleName;
use super::backend::PoolingBackend;
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
use super::json::{json_to_pg_text, pg_text_row_to_json};
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
use super::SERVERLESS_DRIVER_SNI;
#[derive(serde::Deserialize)]
struct QueryData {
query: String,
params: Vec<serde_json::Value>,
#[serde(deserialize_with = "bytes_to_pg_text")]
params: Vec<Option<String>>,
}
#[derive(serde::Deserialize)]
@@ -69,6 +74,15 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
fn bytes_to_pg_text<'de, D>(deserializer: D) -> Result<Vec<Option<String>>, D::Error>
where
D: serde::de::Deserializer<'de>,
{
// TODO: consider avoiding the allocation here.
let json: Vec<Value> = serde::de::Deserialize::deserialize(deserializer)?;
Ok(json_to_pg_text(json))
}
fn get_conn_info(
ctx: &mut RequestMonitoring,
headers: &HeaderMap,
@@ -171,16 +185,15 @@ fn check_matches(sni_hostname: &str, hostname: &str) -> Result<bool, anyhow::Err
// TODO: return different http error codes
pub async fn handle(
tls: &'static TlsConfig,
config: &'static HttpConfig,
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
backend: Arc<PoolingBackend>,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.request_timeout,
handle_inner(tls, config, ctx, request, sni_hostname, conn_pool),
config.http_config.request_timeout,
handle_inner(config, ctx, request, sni_hostname, backend),
)
.await;
let mut response = match result {
@@ -265,7 +278,7 @@ pub async fn handle(
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.request_timeout.as_secs()
config.http_config.request_timeout.as_secs()
);
error!(message);
json_response(
@@ -283,22 +296,36 @@ pub async fn handle(
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
async fn handle_inner(
tls: &'static TlsConfig,
config: &'static HttpConfig,
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
backend: Arc<PoolingBackend>,
) -> anyhow::Result<Response<Body>> {
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
.with_label_values(&["http"])
.with_label_values(&[ctx.protocol])
.guard();
info!(
protocol = ctx.protocol,
"handling interactive connection from client"
);
//
// Determine the destination and connection params
//
let headers = request.headers();
let conn_info = get_conn_info(ctx, headers, sni_hostname, tls)?;
// TLS config should be there.
let conn_info = get_conn_info(
ctx,
headers,
sni_hostname,
config.tls_config.as_ref().unwrap(),
)?;
info!(
user = conn_info.user_info.user.as_str(),
project = conn_info.user_info.endpoint.as_str(),
"credentials"
);
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
@@ -307,8 +334,8 @@ async fn handle_inner(
// Allow connection pooling only if explicitly requested
// or if we have decided that http pool is no longer opt-in
let allow_pool =
!config.pool_options.opt_in || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
let allow_pool = !config.http_config.pool_options.opt_in
|| headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
// isolation level, read only and deferrable
@@ -333,6 +360,8 @@ async fn handle_inner(
None => MAX_REQUEST_SIZE + 1,
};
drop(paused);
info!(request_content_length, "request size in bytes");
HTTP_CONTENT_LENGTH.observe(request_content_length as f64);
// we don't have a streaming request support yet so this is to prevent OOM
// from a malicious user sending an extremely large request body
@@ -342,13 +371,28 @@ async fn handle_inner(
));
}
//
// Read the query and query params from the request body
//
let body = hyper::body::to_bytes(request.into_body()).await?;
let payload: Payload = serde_json::from_slice(&body)?;
let fetch_and_process_request = async {
let body = hyper::body::to_bytes(request.into_body())
.await
.map_err(anyhow::Error::from)?;
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
};
let mut client = conn_pool.get(ctx, conn_info, !allow_pool).await?;
let authenticate_and_connect = async {
let keys = backend.authenticate(ctx, &conn_info).await?;
backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await
};
// Run both operations in parallel
let (payload_result, auth_and_connect_result) =
join!(fetch_and_process_request, authenticate_and_connect,);
// Handle the results
let payload = payload_result?; // Handle errors appropriately
let mut client = auth_and_connect_result?; // Handle errors appropriately
let mut response = Response::builder()
.status(StatusCode::OK)
@@ -482,7 +526,7 @@ async fn query_to_json<T: GenericClient>(
raw_output: bool,
array_mode: bool,
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
let query_params = json_to_pg_text(data.params);
let query_params = data.params;
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging