mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-25 23:29:59 +00:00
proxy: propagate session IDs (#9509)
fixes #9367 by sending session IDs to local_proxy, and also returns session IDs to the client for easier debugging.
This commit is contained in:
@@ -32,6 +32,7 @@ use hyper_util::rt::TokioExecutor;
|
|||||||
use hyper_util::server::conn::auto::Builder;
|
use hyper_util::server::conn::auto::Builder;
|
||||||
use rand::rngs::StdRng;
|
use rand::rngs::StdRng;
|
||||||
use rand::SeedableRng;
|
use rand::SeedableRng;
|
||||||
|
use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID};
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
@@ -309,7 +310,18 @@ async fn connection_handler(
|
|||||||
hyper_util::rt::TokioIo::new(conn),
|
hyper_util::rt::TokioIo::new(conn),
|
||||||
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
|
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
|
||||||
// First HTTP request shares the same session ID
|
// First HTTP request shares the same session ID
|
||||||
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
||||||
|
|
||||||
|
if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) {
|
||||||
|
// take session_id from request, if given.
|
||||||
|
if let Some(id) = req
|
||||||
|
.headers()
|
||||||
|
.get(&NEON_REQUEST_ID)
|
||||||
|
.and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok())
|
||||||
|
{
|
||||||
|
session_id = id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Cancel the current inflight HTTP request if the requets stream is closed.
|
// Cancel the current inflight HTTP request if the requets stream is closed.
|
||||||
// This is slightly different to `_cancel_connection` in that
|
// This is slightly different to `_cancel_connection` in that
|
||||||
@@ -335,8 +347,15 @@ async fn connection_handler(
|
|||||||
.map_ok_or_else(api_error_into_response, |r| r),
|
.map_ok_or_else(api_error_into_response, |r| r),
|
||||||
);
|
);
|
||||||
async move {
|
async move {
|
||||||
let res = handler.await;
|
let mut res = handler.await;
|
||||||
cancel_request.disarm();
|
cancel_request.disarm();
|
||||||
|
|
||||||
|
// add the session ID to the response
|
||||||
|
if let Ok(resp) = &mut res {
|
||||||
|
resp.headers_mut()
|
||||||
|
.append(&NEON_REQUEST_ID, uuid_to_header_value(session_id));
|
||||||
|
}
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ use typed_json::json;
|
|||||||
use url::Url;
|
use url::Url;
|
||||||
use urlencoding;
|
use urlencoding;
|
||||||
use utils::http::error::ApiError;
|
use utils::http::error::ApiError;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::backend::{LocalProxyConnError, PoolingBackend};
|
use super::backend::{LocalProxyConnError, PoolingBackend};
|
||||||
use super::conn_pool::{AuthData, ConnInfoWithAuth};
|
use super::conn_pool::{AuthData, ConnInfoWithAuth};
|
||||||
@@ -63,6 +64,8 @@ enum Payload {
|
|||||||
Batch(BatchQueryData),
|
Batch(BatchQueryData),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) static NEON_REQUEST_ID: HeaderName = HeaderName::from_static("neon-request-id");
|
||||||
|
|
||||||
static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string");
|
static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string");
|
||||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||||
@@ -706,6 +709,12 @@ static HEADERS_TO_FORWARD: &[&HeaderName] = &[
|
|||||||
&TXN_DEFERRABLE,
|
&TXN_DEFERRABLE,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue {
|
||||||
|
let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH];
|
||||||
|
HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..]))
|
||||||
|
.expect("uuid hyphenated format should be all valid header characters")
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_auth_broker_inner(
|
async fn handle_auth_broker_inner(
|
||||||
ctx: &RequestMonitoring,
|
ctx: &RequestMonitoring,
|
||||||
request: Request<Incoming>,
|
request: Request<Incoming>,
|
||||||
@@ -732,6 +741,7 @@ async fn handle_auth_broker_inner(
|
|||||||
req = req.header(h, hv);
|
req = req.header(h, hv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id()));
|
||||||
|
|
||||||
let req = req
|
let req = req
|
||||||
.body(body)
|
.body(body)
|
||||||
|
|||||||
Reference in New Issue
Block a user