mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
[proxy] refactor logging ID system
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -5406,6 +5406,7 @@ dependencies = [
|
||||
"tracing-test",
|
||||
"tracing-utils",
|
||||
"try-lock",
|
||||
"type-safe-id",
|
||||
"typed-json",
|
||||
"url",
|
||||
"urlencoding",
|
||||
@@ -8087,6 +8088,19 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "type-safe-id"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd9267f90719e0433aae095640b294ff36ccbf89649ecb9ee34464ec504be157"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"rand 0.9.1",
|
||||
"serde",
|
||||
"thiserror 2.0.11",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typed-json"
|
||||
version = "0.1.1"
|
||||
|
||||
@@ -98,6 +98,7 @@ tracing-log.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
try-lock.workspace = true
|
||||
typed-json.workspace = true
|
||||
type-safe-id = { version = "0.3.3", features = ["serde"] }
|
||||
url.workspace = true
|
||||
urlencoding.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -26,6 +26,7 @@ use utils::project_git_version;
|
||||
use utils::sentry_init::init_sentry;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::id::{ClientConnId, RequestId};
|
||||
use crate::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use crate::pglb::TlsRequired;
|
||||
use crate::pqproto::FeStartupPacket;
|
||||
@@ -219,7 +220,8 @@ pub(super) async fn task_main(
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let conn_id = ClientConnId::new();
|
||||
let session_id = RequestId::from_uuid(conn_id.uuid());
|
||||
let tls_config = Arc::clone(&tls_config);
|
||||
let dest_suffix = Arc::clone(&dest_suffix);
|
||||
let compute_tls_config = compute_tls_config.clone();
|
||||
@@ -231,6 +233,7 @@ pub(super) async fn task_main(
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
let ctx = RequestContext::new(
|
||||
conn_id,
|
||||
session_id,
|
||||
ConnectionInfo {
|
||||
addr: peer_addr,
|
||||
@@ -252,7 +255,7 @@ pub(super) async fn task_main(
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
})
|
||||
.instrument(tracing::info_span!("handle_client", ?session_id)),
|
||||
.instrument(tracing::info_span!("handle_client", %session_id)),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
3
proxy/src/cache/project_info.rs
vendored
3
proxy/src/cache/project_info.rs
vendored
@@ -363,11 +363,12 @@ impl ProjectInfoCacheImpl {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::control_plane::messages::{Details, EndpointRateLimitConfig, ErrorInfo, Status};
|
||||
use crate::control_plane::{AccessBlockerFlags, AuthSecret};
|
||||
use crate::scram::ServerSecret;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_project_info_cache_settings() {
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::context::RequestContext;
|
||||
use crate::control_plane::ControlPlaneApi;
|
||||
use crate::error::ReportableError;
|
||||
use crate::ext::LockExt;
|
||||
use crate::id::RequestId;
|
||||
use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind};
|
||||
use crate::pqproto::CancelKeyData;
|
||||
use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
@@ -486,7 +487,7 @@ impl Session {
|
||||
/// This is not cancel safe
|
||||
pub(crate) async fn maintain_cancel_key(
|
||||
&self,
|
||||
session_id: uuid::Uuid,
|
||||
session_id: RequestId,
|
||||
cancel: tokio::sync::oneshot::Receiver<Infallible>,
|
||||
cancel_closure: &CancelClosure,
|
||||
compute_config: &ComputeConfig,
|
||||
@@ -599,7 +600,7 @@ impl Session {
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
?session_id,
|
||||
%session_id,
|
||||
?err,
|
||||
"could not cancel the query in the database"
|
||||
);
|
||||
|
||||
@@ -25,6 +25,7 @@ use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::metrics::{Metrics, NumDbConnectionsGuard};
|
||||
use crate::pqproto::StartupMessageParams;
|
||||
use crate::proxy::neon_option;
|
||||
@@ -356,6 +357,7 @@ pub struct PostgresSettings {
|
||||
}
|
||||
|
||||
pub struct ComputeConnection {
|
||||
pub compute_conn_id: ComputeConnId,
|
||||
/// Socket connected to a compute node.
|
||||
pub stream: MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
|
||||
/// Labels for proxy's metrics.
|
||||
@@ -373,6 +375,7 @@ impl ConnectInfo {
|
||||
ctx: &RequestContext,
|
||||
aux: &MetricsAuxInfo,
|
||||
config: &ComputeConfig,
|
||||
compute_conn_id: ComputeConnId,
|
||||
) -> Result<ComputeConnection, ConnectionError> {
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let (socket_addr, stream) = self.connect_raw(config).await?;
|
||||
@@ -382,6 +385,7 @@ impl ConnectInfo {
|
||||
|
||||
// TODO: lots of useful info but maybe we can move it elsewhere (eg traces?)
|
||||
info!(
|
||||
%compute_conn_id,
|
||||
cold_start_info = ctx.cold_start_info().as_str(),
|
||||
"connected to compute node at {} ({socket_addr}) sslmode={:?}, latency={}, query_id={}",
|
||||
self.host,
|
||||
@@ -391,6 +395,7 @@ impl ConnectInfo {
|
||||
);
|
||||
|
||||
let connection = ComputeConnection {
|
||||
compute_conn_id,
|
||||
stream,
|
||||
socket_addr,
|
||||
hostname: self.host.clone(),
|
||||
|
||||
@@ -10,7 +10,8 @@ use crate::cancellation::CancellationHandler;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::id::{ClientConnId, ComputeConnId, RequestId};
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard, Protocol};
|
||||
use crate::pglb::ClientRequestError;
|
||||
use crate::pglb::handshake::{HandshakeData, handshake};
|
||||
use crate::pglb::passthrough::ProxyPassthrough;
|
||||
@@ -42,12 +43,10 @@ pub async fn task_main(
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
let conn_gauge = Metrics::get()
|
||||
.proxy
|
||||
.client_connections
|
||||
.guard(crate::metrics::Protocol::Tcp);
|
||||
let conn_gauge = Metrics::get().proxy.client_connections.guard(Protocol::Tcp);
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let conn_id = ClientConnId::new();
|
||||
let session_id = RequestId::from_uuid(conn_id.uuid());
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let cancellations = cancellations.clone();
|
||||
|
||||
@@ -90,7 +89,7 @@ pub async fn task_main(
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
|
||||
let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Tcp);
|
||||
|
||||
let res = handle_client(
|
||||
config,
|
||||
@@ -120,13 +119,13 @@ pub async fn task_main(
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
error!(
|
||||
?session_id,
|
||||
%session_id,
|
||||
"per-client task finished with an IO error from the client: {e:#}"
|
||||
);
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(
|
||||
?session_id,
|
||||
%session_id,
|
||||
"per-client task finished with an IO error from the compute: {e:#}"
|
||||
);
|
||||
}
|
||||
@@ -214,10 +213,14 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
};
|
||||
auth_info.set_startup_params(¶ms, true);
|
||||
|
||||
// for TCP/WS, we have client_id=session_id=compute_id for now.
|
||||
let compute_conn_id = ComputeConnId::from_uuid(ctx.session_id().uuid());
|
||||
|
||||
let mut node = connect_to_compute(
|
||||
ctx,
|
||||
&TcpMechanism {
|
||||
locks: &config.connect_compute_locks,
|
||||
compute_conn_id,
|
||||
},
|
||||
&node_info,
|
||||
config.wake_compute_retry_config,
|
||||
@@ -250,6 +253,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
});
|
||||
|
||||
Ok(Some(ProxyPassthrough {
|
||||
compute_conn_id: node.compute_conn_id,
|
||||
|
||||
client: stream,
|
||||
compute: node.stream,
|
||||
|
||||
|
||||
@@ -9,11 +9,11 @@ use tokio::sync::mpsc;
|
||||
use tracing::field::display;
|
||||
use tracing::{Span, error, info_span};
|
||||
use try_lock::TryLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
use self::parquet::RequestData;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::id::{ClientConnId, RequestId};
|
||||
use crate::intern::{BranchIdInt, ProjectIdInt};
|
||||
use crate::metrics::{LatencyAccumulated, LatencyTimer, Metrics, Protocol, Waiting};
|
||||
use crate::pqproto::StartupMessageParams;
|
||||
@@ -40,7 +40,7 @@ pub struct RequestContext(
|
||||
|
||||
struct RequestContextInner {
|
||||
pub(crate) conn_info: ConnectionInfo,
|
||||
pub(crate) session_id: Uuid,
|
||||
pub(crate) session_id: RequestId,
|
||||
pub(crate) protocol: Protocol,
|
||||
first_packet: chrono::DateTime<Utc>,
|
||||
pub(crate) span: Span,
|
||||
@@ -116,12 +116,18 @@ impl Clone for RequestContext {
|
||||
}
|
||||
|
||||
impl RequestContext {
|
||||
pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self {
|
||||
pub fn new(
|
||||
conn_id: ClientConnId,
|
||||
session_id: RequestId,
|
||||
conn_info: ConnectionInfo,
|
||||
protocol: Protocol,
|
||||
) -> Self {
|
||||
// TODO: be careful with long lived spans
|
||||
let span = info_span!(
|
||||
"connect_request",
|
||||
%protocol,
|
||||
?session_id,
|
||||
%session_id,
|
||||
%conn_id,
|
||||
%conn_info,
|
||||
ep = tracing::field::Empty,
|
||||
role = tracing::field::Empty,
|
||||
@@ -164,7 +170,13 @@ impl RequestContext {
|
||||
let ip = IpAddr::from([127, 0, 0, 1]);
|
||||
let addr = SocketAddr::new(ip, 5432);
|
||||
let conn_info = ConnectionInfo { addr, extra: None };
|
||||
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp)
|
||||
let uuid = uuid::Uuid::now_v7();
|
||||
RequestContext::new(
|
||||
ClientConnId::from_uuid(uuid),
|
||||
RequestId::from_uuid(uuid),
|
||||
conn_info,
|
||||
Protocol::Tcp,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn console_application_name(&self) -> String {
|
||||
@@ -311,7 +323,7 @@ impl RequestContext {
|
||||
self.0.try_lock().expect("should not deadlock").span.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn session_id(&self) -> Uuid {
|
||||
pub(crate) fn session_id(&self) -> RequestId {
|
||||
self.0.try_lock().expect("should not deadlock").session_id
|
||||
}
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ impl serde::Serialize for Options<'_> {
|
||||
impl From<&RequestContextInner> for RequestData {
|
||||
fn from(value: &RequestContextInner) -> Self {
|
||||
Self {
|
||||
session_id: value.session_id,
|
||||
session_id: value.session_id.uuid(),
|
||||
peer_addr: value.conn_info.addr.ip().to_string(),
|
||||
timestamp: value.first_packet.naive_utc(),
|
||||
username: value.user.as_deref().map(String::from),
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::cache::{Cached, TimedLru};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
|
||||
@@ -77,8 +78,11 @@ impl NodeInfo {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
config: &ComputeConfig,
|
||||
compute_conn_id: ComputeConnId,
|
||||
) -> Result<compute::ComputeConnection, compute::ConnectionError> {
|
||||
self.conn_info.connect(ctx, &self.aux, config).await
|
||||
self.conn_info
|
||||
.connect(ctx, &self.aux, config, compute_conn_id)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
33
proxy/src/id.rs
Normal file
33
proxy/src/id.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
//! Various ID types used by proxy.
|
||||
|
||||
use type_safe_id::{StaticType, TypeSafeId};
|
||||
|
||||
/// The ID used for the client connection
|
||||
pub type ClientConnId = TypeSafeId<ClientConn>;
|
||||
|
||||
#[derive(Copy, Clone, Default, Hash, PartialEq, Eq)]
|
||||
pub struct ClientConn;
|
||||
|
||||
impl StaticType for ClientConn {
|
||||
// This is visible by customers, so we use 'neon' here instead of 'client'.
|
||||
const TYPE: &'static str = "neon_conn";
|
||||
}
|
||||
|
||||
/// The ID used for the compute connection
|
||||
pub type ComputeConnId = TypeSafeId<ComputeConn>;
|
||||
|
||||
#[derive(Copy, Clone, Default, Hash, PartialEq, Eq)]
|
||||
pub struct ComputeConn;
|
||||
|
||||
impl StaticType for ComputeConn {
|
||||
const TYPE: &'static str = "compute_conn";
|
||||
}
|
||||
|
||||
/// The ID used for the request to authenticate
|
||||
pub type RequestId = TypeSafeId<Request>;
|
||||
#[derive(Copy, Clone, Default, Hash, PartialEq, Eq)]
|
||||
pub struct Request;
|
||||
|
||||
impl StaticType for Request {
|
||||
const TYPE: &'static str = "request";
|
||||
}
|
||||
@@ -91,6 +91,7 @@ mod control_plane;
|
||||
mod error;
|
||||
mod ext;
|
||||
mod http;
|
||||
mod id;
|
||||
mod intern;
|
||||
mod jemalloc;
|
||||
mod logging;
|
||||
|
||||
@@ -17,7 +17,8 @@ use crate::cancellation::{self, CancellationHandler};
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::id::{ClientConnId, RequestId};
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard, Protocol};
|
||||
pub use crate::pglb::copy_bidirectional::ErrorSource;
|
||||
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
|
||||
use crate::pglb::passthrough::ProxyPassthrough;
|
||||
@@ -65,12 +66,11 @@ pub async fn task_main(
|
||||
{
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
let conn_gauge = Metrics::get()
|
||||
.proxy
|
||||
.client_connections
|
||||
.guard(crate::metrics::Protocol::Tcp);
|
||||
let conn_gauge = Metrics::get().proxy.client_connections.guard(Protocol::Tcp);
|
||||
|
||||
let conn_id = ClientConnId::new();
|
||||
let session_id = RequestId::from_uuid(conn_id.uuid());
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let cancellations = cancellations.clone();
|
||||
|
||||
@@ -114,7 +114,7 @@ pub async fn task_main(
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
|
||||
let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Tcp);
|
||||
|
||||
let res = handle_connection(
|
||||
config,
|
||||
@@ -142,17 +142,22 @@ pub async fn task_main(
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
let _disconnect = ctx.log_connect();
|
||||
let compute_conn_id = p.compute_conn_id;
|
||||
match p.proxy_pass().await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
warn!(
|
||||
?session_id,
|
||||
%conn_id,
|
||||
%session_id,
|
||||
%compute_conn_id,
|
||||
"per-client task finished with an IO error from the client: {e:#}"
|
||||
);
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(
|
||||
?session_id,
|
||||
%conn_id,
|
||||
%session_id,
|
||||
%compute_conn_id,
|
||||
"per-client task finished with an IO error from the compute: {e:#}"
|
||||
);
|
||||
}
|
||||
@@ -318,6 +323,8 @@ pub(crate) async fn handle_connection<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
};
|
||||
|
||||
Ok(Some(ProxyPassthrough {
|
||||
compute_conn_id: node.compute_conn_id,
|
||||
|
||||
client,
|
||||
compute: node.stream,
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use utils::measured_stream::MeasuredStream;
|
||||
use super::copy_bidirectional::ErrorSource;
|
||||
use crate::compute::MaybeRustlsStream;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::metrics::{
|
||||
Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard,
|
||||
NumDbConnectionsGuard,
|
||||
@@ -65,6 +66,8 @@ pub(crate) async fn proxy_pass(
|
||||
}
|
||||
|
||||
pub(crate) struct ProxyPassthrough<S> {
|
||||
pub(crate) compute_conn_id: ComputeConnId,
|
||||
|
||||
pub(crate) client: Stream<S>,
|
||||
pub(crate) compute: MaybeRustlsStream,
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::control_plane::{self, NodeInfo};
|
||||
use crate::error::ReportableError;
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
|
||||
};
|
||||
@@ -51,6 +52,7 @@ pub(crate) trait ConnectMechanism {
|
||||
pub(crate) struct TcpMechanism {
|
||||
/// connect_to_compute concurrency lock
|
||||
pub(crate) locks: &'static ApiLocks<Host>,
|
||||
pub(crate) compute_conn_id: ComputeConnId,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -70,7 +72,7 @@ impl ConnectMechanism for TcpMechanism {
|
||||
config: &ComputeConfig,
|
||||
) -> Result<ComputeConnection, Self::Error> {
|
||||
let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
|
||||
permit.release_result(node_info.connect(ctx, config).await)
|
||||
permit.release_result(node_info.connect(ctx, config, self.compute_conn_id).await)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::compute::ComputeConnection;
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::client::ControlPlaneClient;
|
||||
use crate::id::ComputeConnId;
|
||||
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
|
||||
use crate::pglb::{ClientMode, ClientRequestError};
|
||||
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
|
||||
@@ -94,6 +95,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
let mut attempt = 0;
|
||||
let connect = TcpMechanism {
|
||||
locks: &config.connect_compute_locks,
|
||||
// for TCP/WS, we have client_id=session_id=compute_id for now.
|
||||
compute_conn_id: ComputeConnId::from_uuid(ctx.session_id().uuid()),
|
||||
};
|
||||
let backend = auth::Backend::ControlPlane(cplane, creds.info);
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::id::{ComputeConnId, RequestId};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::proxy::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
|
||||
@@ -161,7 +162,7 @@ impl PoolingBackend {
|
||||
#[tracing::instrument(skip_all, fields(
|
||||
pid = tracing::field::Empty,
|
||||
compute_id = tracing::field::Empty,
|
||||
conn_id = tracing::field::Empty,
|
||||
compute_conn_id = tracing::field::Empty,
|
||||
))]
|
||||
pub(crate) async fn connect_to_compute(
|
||||
&self,
|
||||
@@ -181,14 +182,14 @@ impl PoolingBackend {
|
||||
if let Some(client) = maybe_client {
|
||||
return Ok(client);
|
||||
}
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let compute_conn_id = ComputeConnId::new();
|
||||
tracing::Span::current().record("compute_conn_id", display(compute_conn_id));
|
||||
info!(%compute_conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| keys.info);
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
conn_info,
|
||||
pool: self.pool.clone(),
|
||||
locks: &self.config.connect_compute_locks,
|
||||
@@ -204,7 +205,7 @@ impl PoolingBackend {
|
||||
// Wake up the destination if needed
|
||||
#[tracing::instrument(skip_all, fields(
|
||||
compute_id = tracing::field::Empty,
|
||||
conn_id = tracing::field::Empty,
|
||||
compute_conn_id = tracing::field::Empty,
|
||||
))]
|
||||
pub(crate) async fn connect_to_local_proxy(
|
||||
&self,
|
||||
@@ -216,9 +217,9 @@ impl PoolingBackend {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let compute_conn_id = ComputeConnId::new();
|
||||
tracing::Span::current().record("compute_conn_id", display(compute_conn_id));
|
||||
debug!(%compute_conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
|
||||
user: conn_info.user_info.user.clone(),
|
||||
endpoint: EndpointId::from(format!(
|
||||
@@ -230,7 +231,7 @@ impl PoolingBackend {
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&HyperMechanism {
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
conn_info,
|
||||
pool: self.http_conn_pool.clone(),
|
||||
locks: &self.config.connect_compute_locks,
|
||||
@@ -251,7 +252,7 @@ impl PoolingBackend {
|
||||
/// Panics if called with a non-local_proxy backend.
|
||||
#[tracing::instrument(skip_all, fields(
|
||||
pid = tracing::field::Empty,
|
||||
conn_id = tracing::field::Empty,
|
||||
compute_conn_id = tracing::field::Empty,
|
||||
))]
|
||||
pub(crate) async fn connect_to_local_postgres(
|
||||
&self,
|
||||
@@ -303,9 +304,9 @@ impl PoolingBackend {
|
||||
}
|
||||
}
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
|
||||
let compute_conn_id = ComputeConnId::new();
|
||||
tracing::Span::current().record("compute_conn_id", display(compute_conn_id));
|
||||
info!(%compute_conn_id, "local_pool: opening a new connection '{conn_info}'");
|
||||
|
||||
let (key, jwk) = create_random_jwk();
|
||||
|
||||
@@ -340,7 +341,7 @@ impl PoolingBackend {
|
||||
client,
|
||||
connection,
|
||||
key,
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
local_backend.node_info.aux.clone(),
|
||||
);
|
||||
|
||||
@@ -378,7 +379,7 @@ fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum HttpConnError {
|
||||
#[error("pooled connection closed at inconsistent state")]
|
||||
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
|
||||
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<RequestId>),
|
||||
#[error("could not connect to postgres in compute")]
|
||||
PostgresConnectionError(#[from] postgres_client::Error),
|
||||
#[error("could not connect to local-proxy in compute")]
|
||||
@@ -509,7 +510,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
compute_conn_id: ComputeConnId,
|
||||
keys: ComputeCredentialKeys,
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
@@ -561,7 +562,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
self.conn_info.clone(),
|
||||
client,
|
||||
connection,
|
||||
self.conn_id,
|
||||
self.compute_conn_id,
|
||||
node_info.aux.clone(),
|
||||
))
|
||||
}
|
||||
@@ -570,7 +571,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
struct HyperMechanism {
|
||||
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
compute_conn_id: ComputeConnId,
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
locks: &'static ApiLocks<Host>,
|
||||
@@ -620,7 +621,7 @@ impl ConnectMechanism for HyperMechanism {
|
||||
&self.conn_info,
|
||||
client,
|
||||
connection,
|
||||
self.conn_id,
|
||||
self.compute_conn_id,
|
||||
node_info.aux.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -10,7 +10,8 @@ use rand::{Rng, thread_rng};
|
||||
use rustc_hash::FxHasher;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::id::ClientConnId;
|
||||
|
||||
type Hasher = BuildHasherDefault<FxHasher>;
|
||||
|
||||
@@ -21,7 +22,7 @@ pub struct CancelSet {
|
||||
}
|
||||
|
||||
pub(crate) struct CancelShard {
|
||||
tokens: IndexMap<uuid::Uuid, (Instant, CancellationToken), Hasher>,
|
||||
tokens: IndexMap<ClientConnId, (Instant, CancellationToken), Hasher>,
|
||||
}
|
||||
|
||||
impl CancelSet {
|
||||
@@ -53,7 +54,7 @@ impl CancelSet {
|
||||
.and_then(|len| self.shards[rng % len].lock().take(rng / len))
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&self, id: uuid::Uuid, token: CancellationToken) -> CancelGuard<'_> {
|
||||
pub(crate) fn insert(&self, id: ClientConnId, token: CancellationToken) -> CancelGuard<'_> {
|
||||
let shard = NonZeroUsize::new(self.shards.len()).map(|len| {
|
||||
let hash = self.hasher.hash_one(id) as usize;
|
||||
let shard = &self.shards[hash % len];
|
||||
@@ -77,18 +78,18 @@ impl CancelShard {
|
||||
})
|
||||
}
|
||||
|
||||
fn remove(&mut self, id: uuid::Uuid) {
|
||||
fn remove(&mut self, id: ClientConnId) {
|
||||
self.tokens.swap_remove(&id);
|
||||
}
|
||||
|
||||
fn insert(&mut self, id: uuid::Uuid, token: CancellationToken) {
|
||||
fn insert(&mut self, id: ClientConnId, token: CancellationToken) {
|
||||
self.tokens.insert(id, (Instant::now(), token));
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct CancelGuard<'a> {
|
||||
shard: Option<&'a Mutex<CancelShard>>,
|
||||
id: Uuid,
|
||||
id: ClientConnId,
|
||||
}
|
||||
|
||||
impl Drop for CancelGuard<'_> {
|
||||
|
||||
@@ -26,6 +26,7 @@ use super::conn_pool_lib::{
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::id::{ComputeConnId, RequestId};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
type TlsStream = <ComputeConfig as MakeTlsConnect<TcpStream>>::Stream;
|
||||
@@ -62,14 +63,14 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
mut connection: postgres_client::Connection<TcpStream, TlsStream>,
|
||||
conn_id: uuid::Uuid,
|
||||
compute_conn_id: ComputeConnId,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<C> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let mut session_id = ctx.session_id();
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let span = info_span!(parent: None, "connection", %compute_conn_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
@@ -117,7 +118,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
// remove client from pool - should close the connection if it's idle.
|
||||
// does nothing if the client is currently checked-out and in-use
|
||||
if pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
if pool.write().remove_client(db_user.clone(), compute_conn_id) {
|
||||
info!("idle connection removed");
|
||||
}
|
||||
}
|
||||
@@ -149,7 +150,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade()
|
||||
&& pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
&& pool.write().remove_client(db_user.clone(), compute_conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
|
||||
@@ -161,7 +162,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
let inner = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
data: ClientDataEnum::Remote(ClientDataRemote {
|
||||
session: tx,
|
||||
cancel,
|
||||
@@ -173,12 +174,12 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientDataRemote {
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
session: tokio::sync::watch::Sender<RequestId>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl ClientDataRemote {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<RequestId> {
|
||||
&mut self.session
|
||||
}
|
||||
|
||||
@@ -192,6 +193,7 @@ mod tests {
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use super::*;
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
@@ -225,9 +227,9 @@ mod tests {
|
||||
compute_id: "compute".into(),
|
||||
cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm,
|
||||
},
|
||||
conn_id: uuid::Uuid::new_v4(),
|
||||
compute_conn_id: ComputeConnId::new(),
|
||||
data: ClientDataEnum::Remote(ClientDataRemote {
|
||||
session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()),
|
||||
session: tokio::sync::watch::Sender::new(RequestId::new()),
|
||||
cancel: CancellationToken::new(),
|
||||
}),
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use super::local_conn_pool::ClientDataLocal;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::types::{DbName, EndpointCacheKey, RoleName};
|
||||
@@ -58,7 +59,7 @@ pub(crate) enum ClientDataEnum {
|
||||
pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
|
||||
pub(crate) inner: C,
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
pub(crate) conn_id: uuid::Uuid,
|
||||
pub(crate) compute_conn_id: ComputeConnId,
|
||||
pub(crate) data: ClientDataEnum, // custom client data like session, key, jti
|
||||
}
|
||||
|
||||
@@ -77,8 +78,8 @@ impl<C: ClientInnerExt> Drop for ClientInnerCommon<C> {
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> ClientInnerCommon<C> {
|
||||
pub(crate) fn get_conn_id(&self) -> uuid::Uuid {
|
||||
self.conn_id
|
||||
pub(crate) fn get_conn_id(&self) -> ComputeConnId {
|
||||
self.compute_conn_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum {
|
||||
@@ -144,7 +145,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
pub(crate) fn remove_client(
|
||||
&mut self,
|
||||
db_user: (DbName, RoleName),
|
||||
conn_id: uuid::Uuid,
|
||||
conn_id: ComputeConnId,
|
||||
) -> bool {
|
||||
let Self {
|
||||
pools,
|
||||
@@ -189,7 +190,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
}
|
||||
|
||||
pub(crate) fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInnerCommon<C>) {
|
||||
let conn_id = client.get_conn_id();
|
||||
let compute_conn_id = client.get_conn_id();
|
||||
let (max_conn, conn_count, pool_name) = {
|
||||
let pool = pool.read();
|
||||
(
|
||||
@@ -201,12 +202,12 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
};
|
||||
|
||||
if client.inner.is_closed() {
|
||||
info!(%conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name);
|
||||
info!(%compute_conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name);
|
||||
return;
|
||||
}
|
||||
|
||||
if conn_count >= max_conn {
|
||||
info!(%conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name);
|
||||
info!(%compute_conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -241,9 +242,9 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
|
||||
// do logging outside of the mutex
|
||||
if returned {
|
||||
debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
debug!(%compute_conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
} else {
|
||||
info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
info!(%compute_conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use super::conn_pool_lib::{
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::id::ComputeConnId;
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::types::EndpointCacheKey;
|
||||
@@ -65,7 +66,7 @@ impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_conn(&mut self, conn_id: uuid::Uuid) -> bool {
|
||||
fn remove_conn(&mut self, conn_id: ComputeConnId) -> bool {
|
||||
let Self {
|
||||
conns,
|
||||
global_connections_count,
|
||||
@@ -73,7 +74,7 @@ impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
} = self;
|
||||
|
||||
let old_len = conns.len();
|
||||
conns.retain(|entry| entry.conn.conn_id != conn_id);
|
||||
conns.retain(|entry| entry.conn.compute_conn_id != conn_id);
|
||||
let new_len = conns.len();
|
||||
let removed = old_len - new_len;
|
||||
if removed > 0 {
|
||||
@@ -135,7 +136,10 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
|
||||
return result;
|
||||
};
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
|
||||
tracing::Span::current().record(
|
||||
"conn_id",
|
||||
tracing::field::display(client.conn.compute_conn_id),
|
||||
);
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
@@ -194,13 +198,13 @@ pub(crate) fn poll_http2_client(
|
||||
conn_info: &ConnInfo,
|
||||
client: Send,
|
||||
connection: Connect,
|
||||
conn_id: uuid::Uuid,
|
||||
compute_conn_id: ComputeConnId,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<Send> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let session_id = ctx.session_id();
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let span = info_span!(parent: None, "connection", %compute_conn_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
@@ -212,7 +216,7 @@ pub(crate) fn poll_http2_client(
|
||||
let client = ClientInnerCommon {
|
||||
inner: client.clone(),
|
||||
aux: aux.clone(),
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
pool.write().conns.push_back(ConnPoolEntry {
|
||||
@@ -241,7 +245,7 @@ pub(crate) fn poll_http2_client(
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade()
|
||||
&& pool.write().remove_conn(conn_id)
|
||||
&& pool.write().remove_conn(compute_conn_id)
|
||||
{
|
||||
info!("closed connection removed");
|
||||
}
|
||||
@@ -252,7 +256,7 @@ pub(crate) fn poll_http2_client(
|
||||
let client = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use http_body_util::{BodyExt, Full};
|
||||
use http_utils::error::ApiError;
|
||||
use serde::Serialize;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::conn_pool::{AuthData, ConnInfoWithAuth};
|
||||
use super::conn_pool_lib::ConnInfo;
|
||||
@@ -18,6 +17,7 @@ use super::error::{ConnInfoError, Credentials};
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::id::RequestId;
|
||||
use crate::metrics::{Metrics, SniGroup, SniKind};
|
||||
use crate::pqproto::StartupMessageParams;
|
||||
use crate::proxy::NeonOptions;
|
||||
@@ -34,9 +34,8 @@ pub(super) static TXN_ISOLATION_LEVEL: HeaderName =
|
||||
pub(super) static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
|
||||
pub(super) static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable");
|
||||
|
||||
pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue {
|
||||
let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH];
|
||||
HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..]))
|
||||
pub(crate) fn uuid_to_header_value(id: RequestId) -> HeaderValue {
|
||||
HeaderValue::from_maybe_shared(Bytes::from(id.to_string().into_bytes()))
|
||||
.expect("uuid hyphenated format should be all valid header characters")
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ use super::conn_pool_lib::{
|
||||
use super::sql_over_http::SqlOverHttpError;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::id::{ComputeConnId, RequestId};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
@@ -48,14 +49,14 @@ pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientDataLocal {
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
session: tokio::sync::watch::Sender<RequestId>,
|
||||
cancel: CancellationToken,
|
||||
key: SigningKey,
|
||||
jti: u64,
|
||||
}
|
||||
|
||||
impl ClientDataLocal {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<RequestId> {
|
||||
&mut self.session
|
||||
}
|
||||
|
||||
@@ -167,14 +168,14 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
client: C,
|
||||
mut connection: postgres_client::Connection<TcpStream, NoTlsStream>,
|
||||
key: SigningKey,
|
||||
conn_id: uuid::Uuid,
|
||||
compute_conn_id: ComputeConnId,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<C> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let mut session_id = ctx.session_id();
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let span = info_span!(parent: None, "connection", %compute_conn_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
@@ -218,7 +219,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
// remove client from pool - should close the connection if it's idle.
|
||||
// does nothing if the client is currently checked-out and in-use
|
||||
if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
if pool.global_pool.write().remove_client(db_user.clone(), compute_conn_id) {
|
||||
info!("idle connection removed");
|
||||
}
|
||||
}
|
||||
@@ -250,7 +251,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade()
|
||||
&& pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
&& pool.global_pool.write().remove_client(db_user.clone(), compute_conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
|
||||
@@ -263,7 +264,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
let inner = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
conn_id,
|
||||
compute_conn_id,
|
||||
data: ClientDataEnum::Local(ClientDataLocal {
|
||||
session: tx,
|
||||
cancel,
|
||||
|
||||
@@ -16,16 +16,16 @@ mod websocket;
|
||||
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::pin::{Pin, pin};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use arc_swap::ArcSwapOption;
|
||||
use async_trait::async_trait;
|
||||
use atomic_take::AtomicTake;
|
||||
use bytes::Bytes;
|
||||
pub use conn_pool_lib::GlobalConnPoolOptions;
|
||||
use futures::TryFutureExt;
|
||||
use futures::future::{Either, select};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use http::{Method, Response, StatusCode};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Empty};
|
||||
@@ -48,7 +48,8 @@ use crate::cancellation::CancellationHandler;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestContext;
|
||||
use crate::ext::TaskExt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::id::{ClientConnId, RequestId};
|
||||
use crate::metrics::{Metrics, Protocol};
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
@@ -131,13 +132,12 @@ pub async fn task_main(
|
||||
tracing::error!("could not set nodelay: {e}");
|
||||
continue;
|
||||
}
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
let http_conn_span = tracing::info_span!("http_conn", ?conn_id);
|
||||
let conn_id = ClientConnId::new();
|
||||
|
||||
let n_connections = Metrics::get()
|
||||
.proxy
|
||||
.client_connections
|
||||
.sample(crate::metrics::Protocol::Http);
|
||||
.sample(Protocol::Http);
|
||||
tracing::trace!(?n_connections, threshold = ?config.http_config.client_conn_threshold, "check");
|
||||
if n_connections > config.http_config.client_conn_threshold {
|
||||
tracing::trace!("attempting to cancel a random connection");
|
||||
@@ -154,46 +154,41 @@ pub async fn task_main(
|
||||
let cancellation_handler = cancellation_handler.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
let cancellations = cancellations.clone();
|
||||
connections.spawn(
|
||||
async move {
|
||||
let conn_token2 = conn_token.clone();
|
||||
let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token2);
|
||||
connections.spawn(async move {
|
||||
let conn_token2 = conn_token.clone();
|
||||
let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token2);
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let _gauge = Metrics::get()
|
||||
.proxy
|
||||
.client_connections
|
||||
.guard(Protocol::Http);
|
||||
|
||||
let _gauge = Metrics::get()
|
||||
.proxy
|
||||
.client_connections
|
||||
.guard(crate::metrics::Protocol::Http);
|
||||
let startup_result = Box::pin(connection_startup(
|
||||
config,
|
||||
tls_acceptor,
|
||||
conn_id,
|
||||
conn,
|
||||
peer_addr,
|
||||
))
|
||||
.await;
|
||||
let Some((conn, conn_info)) = startup_result else {
|
||||
return;
|
||||
};
|
||||
|
||||
let startup_result = Box::pin(connection_startup(
|
||||
config,
|
||||
tls_acceptor,
|
||||
session_id,
|
||||
conn,
|
||||
peer_addr,
|
||||
))
|
||||
.await;
|
||||
let Some((conn, conn_info)) = startup_result else {
|
||||
return;
|
||||
};
|
||||
|
||||
Box::pin(connection_handler(
|
||||
config,
|
||||
backend,
|
||||
connections2,
|
||||
cancellations,
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
conn_token,
|
||||
conn,
|
||||
conn_info,
|
||||
session_id,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
.instrument(http_conn_span),
|
||||
);
|
||||
Box::pin(connection_handler(
|
||||
config,
|
||||
backend,
|
||||
connections2,
|
||||
cancellations,
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
conn_token,
|
||||
conn,
|
||||
conn_info,
|
||||
conn_id,
|
||||
))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
connections.wait().await;
|
||||
@@ -230,7 +225,7 @@ impl MaybeTlsAcceptor for &'static ArcSwapOption<crate::config::TlsConfig> {
|
||||
async fn connection_startup(
|
||||
config: &ProxyConfig,
|
||||
tls_acceptor: Arc<dyn MaybeTlsAcceptor>,
|
||||
session_id: uuid::Uuid,
|
||||
conn_id: ClientConnId,
|
||||
conn: TcpStream,
|
||||
peer_addr: SocketAddr,
|
||||
) -> Option<(AsyncRW, ConnectionInfo)> {
|
||||
@@ -265,12 +260,12 @@ async fn connection_startup(
|
||||
IpAddr::V4(ip) => ip.is_private(),
|
||||
IpAddr::V6(_) => false,
|
||||
};
|
||||
info!(?session_id, %conn_info, "accepted new TCP connection");
|
||||
info!(%conn_id, %conn_info, "accepted new TCP connection");
|
||||
|
||||
// try upgrade to TLS, but with a timeout.
|
||||
let conn = match timeout(config.handshake_timeout, tls_acceptor.accept(conn)).await {
|
||||
Ok(Ok(conn)) => {
|
||||
info!(?session_id, %conn_info, "accepted new TLS connection");
|
||||
info!(%conn_id, %conn_info, "accepted new TLS connection");
|
||||
conn
|
||||
}
|
||||
// The handshake failed
|
||||
@@ -278,7 +273,7 @@ async fn connection_startup(
|
||||
if !has_private_peer_addr {
|
||||
Metrics::get().proxy.tls_handshake_failures.inc();
|
||||
}
|
||||
warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}");
|
||||
warn!(%conn_id, %conn_info, "failed to accept TLS connection: {e:?}");
|
||||
return None;
|
||||
}
|
||||
// The handshake timed out
|
||||
@@ -286,7 +281,7 @@ async fn connection_startup(
|
||||
if !has_private_peer_addr {
|
||||
Metrics::get().proxy.tls_handshake_failures.inc();
|
||||
}
|
||||
warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}");
|
||||
warn!(%conn_id, %conn_info, "failed to accept TLS connection: {e:?}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
@@ -309,10 +304,8 @@ async fn connection_handler(
|
||||
cancellation_token: CancellationToken,
|
||||
conn: AsyncRW,
|
||||
conn_info: ConnectionInfo,
|
||||
session_id: uuid::Uuid,
|
||||
conn_id: ClientConnId,
|
||||
) {
|
||||
let session_id = AtomicTake::new(session_id);
|
||||
|
||||
// Cancel all current inflight HTTP requests if the HTTP connection is closed.
|
||||
let http_cancellation_token = CancellationToken::new();
|
||||
let _cancel_connection = http_cancellation_token.clone().drop_guard();
|
||||
@@ -322,20 +315,6 @@ async fn connection_handler(
|
||||
let conn = server.serve_connection_with_upgrades(
|
||||
hyper_util::rt::TokioIo::new(conn),
|
||||
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
|
||||
// First HTTP request shares the same session ID
|
||||
let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
||||
|
||||
if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) {
|
||||
// take session_id from request, if given.
|
||||
if let Some(id) = req
|
||||
.headers()
|
||||
.get(&NEON_REQUEST_ID)
|
||||
.and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok())
|
||||
{
|
||||
session_id = id;
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel the current inflight HTTP request if the requets stream is closed.
|
||||
// This is slightly different to `_cancel_connection` in that
|
||||
// h2 can cancel individual requests with a `RST_STREAM`.
|
||||
@@ -352,7 +331,7 @@ async fn connection_handler(
|
||||
backend.clone(),
|
||||
connections.clone(),
|
||||
cancellation_handler.clone(),
|
||||
session_id,
|
||||
conn_id,
|
||||
conn_info2.clone(),
|
||||
http_request_token,
|
||||
endpoint_rate_limiter.clone(),
|
||||
@@ -362,15 +341,8 @@ async fn connection_handler(
|
||||
.map_ok_or_else(api_error_into_response, |r| r),
|
||||
);
|
||||
async move {
|
||||
let mut res = handler.await;
|
||||
let res = handler.await;
|
||||
cancel_request.disarm();
|
||||
|
||||
// add the session ID to the response
|
||||
if let Ok(resp) = &mut res {
|
||||
resp.headers_mut()
|
||||
.append(&NEON_REQUEST_ID, uuid_to_header_value(session_id));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}),
|
||||
@@ -392,6 +364,44 @@ async fn connection_handler(
|
||||
}
|
||||
}
|
||||
|
||||
fn get_request_id(backend: &PoolingBackend, req: &hyper::Request<Incoming>) -> RequestId {
|
||||
if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) {
|
||||
// take session_id from request, if given.
|
||||
|
||||
if let Some(id) = req
|
||||
.headers()
|
||||
.get(&NEON_REQUEST_ID)
|
||||
.and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok())
|
||||
{
|
||||
return RequestId::from_uuid(id);
|
||||
}
|
||||
|
||||
if let Some(id) = req
|
||||
.headers()
|
||||
.get(&NEON_REQUEST_ID)
|
||||
.and_then(|id| id.to_str().ok())
|
||||
.and_then(|id| RequestId::from_str(id).ok())
|
||||
{
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
||||
RequestId::new()
|
||||
}
|
||||
|
||||
fn set_request_id<T, E>(
|
||||
mut res: Result<hyper::Response<T>, E>,
|
||||
session_id: RequestId,
|
||||
) -> Result<hyper::Response<T>, E> {
|
||||
// add the session ID to the response
|
||||
if let Ok(resp) = &mut res {
|
||||
resp.headers_mut()
|
||||
.append(&NEON_REQUEST_ID, uuid_to_header_value(session_id));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn request_handler(
|
||||
mut request: hyper::Request<Incoming>,
|
||||
@@ -399,7 +409,7 @@ async fn request_handler(
|
||||
backend: Arc<PoolingBackend>,
|
||||
ws_connections: TaskTracker,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
session_id: uuid::Uuid,
|
||||
conn_id: ClientConnId,
|
||||
conn_info: ConnectionInfo,
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
@@ -417,7 +427,8 @@ async fn request_handler(
|
||||
if config.http_config.accept_websockets
|
||||
&& framed_websockets::upgrade::is_upgrade_request(&request)
|
||||
{
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Ws);
|
||||
let session_id = RequestId::from_uuid(conn_id.uuid());
|
||||
let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Ws);
|
||||
|
||||
ctx.set_user_agent(
|
||||
request
|
||||
@@ -457,7 +468,8 @@ async fn request_handler(
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response.map(|b| b.map_err(|x| match x {}).boxed()))
|
||||
} else if request.uri().path() == "/sql" && *request.method() == Method::POST {
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
|
||||
let session_id = get_request_id(&backend, &request);
|
||||
let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Http);
|
||||
let span = ctx.span();
|
||||
|
||||
let testodrome_id = request
|
||||
@@ -473,6 +485,7 @@ async fn request_handler(
|
||||
|
||||
sql_over_http::handle(config, ctx, request, backend, http_cancellation_token)
|
||||
.instrument(span)
|
||||
.map(|res| set_request_id(res, session_id))
|
||||
.await
|
||||
} else if request.uri().path() == "/sql" && *request.method() == Method::OPTIONS {
|
||||
Response::builder()
|
||||
|
||||
Reference in New Issue
Block a user