proxy: Rename RequestMonitoring to RequestContext (#9805)

## Problem

It is called context/ctx everywhere and the Monitoring suffix needlessly
confuses with proper monitoring code.

## Summary of changes

* Rename RequestMonitoring to RequestContext
* Rename RequestMonitoringInner to RequestContextInner
This commit is contained in:
Folke Behrens
2024-11-20 13:50:36 +01:00
committed by GitHub
parent 899933e159
commit bf7d859a8b
32 changed files with 162 additions and 163 deletions

View File

@@ -5,13 +5,13 @@ use super::{ComputeCredentials, ComputeUserInfo};
use crate::auth::backend::ComputeCredentialKeys; use crate::auth::backend::ComputeCredentialKeys;
use crate::auth::{self, AuthFlow}; use crate::auth::{self, AuthFlow};
use crate::config::AuthenticationConfig; use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::AuthSecret; use crate::control_plane::AuthSecret;
use crate::stream::{PqStream, Stream}; use crate::stream::{PqStream, Stream};
use crate::{compute, sasl}; use crate::{compute, sasl};
pub(super) async fn authenticate( pub(super) async fn authenticate(
ctx: &RequestMonitoring, ctx: &RequestContext,
creds: ComputeUserInfo, creds: ComputeUserInfo,
client: &mut PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>, client: &mut PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
config: &'static AuthenticationConfig, config: &'static AuthenticationConfig,

View File

@@ -8,7 +8,7 @@ use tracing::{info, info_span};
use super::ComputeCredentialKeys; use super::ComputeCredentialKeys;
use crate::cache::Cached; use crate::cache::Cached;
use crate::config::AuthenticationConfig; use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::{ReportableError, UserFacingError}; use crate::error::{ReportableError, UserFacingError};
use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::connect_compute::ComputeConnectBackend;
@@ -71,7 +71,7 @@ impl ConsoleRedirectBackend {
pub(crate) async fn authenticate( pub(crate) async fn authenticate(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
auth_config: &'static AuthenticationConfig, auth_config: &'static AuthenticationConfig,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>, client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<ConsoleRedirectNodeInfo> { ) -> auth::Result<ConsoleRedirectNodeInfo> {
@@ -87,7 +87,7 @@ pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
impl ComputeConnectBackend for ConsoleRedirectNodeInfo { impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
async fn wake_compute( async fn wake_compute(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> { ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
Ok(Cached::new_uncached(self.0.clone())) Ok(Cached::new_uncached(self.0.clone()))
} }
@@ -98,7 +98,7 @@ impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
} }
async fn authenticate( async fn authenticate(
ctx: &RequestMonitoring, ctx: &RequestContext,
auth_config: &'static AuthenticationConfig, auth_config: &'static AuthenticationConfig,
link_uri: &reqwest::Url, link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>, client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,

View File

@@ -4,7 +4,7 @@ use tracing::{debug, info};
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint}; use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
use crate::auth::{self, AuthFlow}; use crate::auth::{self, AuthFlow};
use crate::config::AuthenticationConfig; use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::AuthSecret; use crate::control_plane::AuthSecret;
use crate::intern::EndpointIdInt; use crate::intern::EndpointIdInt;
use crate::sasl; use crate::sasl;
@@ -15,7 +15,7 @@ use crate::stream::{self, Stream};
/// These properties are benefical for serverless JS workers, so we /// These properties are benefical for serverless JS workers, so we
/// use this mechanism for websocket connections. /// use this mechanism for websocket connections.
pub(crate) async fn authenticate_cleartext( pub(crate) async fn authenticate_cleartext(
ctx: &RequestMonitoring, ctx: &RequestContext,
info: ComputeUserInfo, info: ComputeUserInfo,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>, client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
secret: AuthSecret, secret: AuthSecret,
@@ -57,7 +57,7 @@ pub(crate) async fn authenticate_cleartext(
/// Similar to [`authenticate_cleartext`], but there's a specific password format, /// Similar to [`authenticate_cleartext`], but there's a specific password format,
/// and passwords are not yet validated (we don't know how to validate them!) /// and passwords are not yet validated (we don't know how to validate them!)
pub(crate) async fn password_hack_no_authentication( pub(crate) async fn password_hack_no_authentication(
ctx: &RequestMonitoring, ctx: &RequestContext,
info: ComputeUserInfoNoEndpoint, info: ComputeUserInfoNoEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>, client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
) -> auth::Result<(ComputeUserInfo, Vec<u8>)> { ) -> auth::Result<(ComputeUserInfo, Vec<u8>)> {

View File

@@ -17,7 +17,7 @@ use thiserror::Error;
use tokio::time::Instant; use tokio::time::Instant;
use crate::auth::backend::ComputeCredentialKeys; use crate::auth::backend::ComputeCredentialKeys;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::errors::GetEndpointJwksError; use crate::control_plane::errors::GetEndpointJwksError;
use crate::http::read_body_with_limit; use crate::http::read_body_with_limit;
use crate::intern::RoleNameInt; use crate::intern::RoleNameInt;
@@ -39,7 +39,7 @@ const JWKS_FETCH_RETRIES: u32 = 3;
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static { pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
fn fetch_auth_rules( fn fetch_auth_rules(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> impl Future<Output = Result<Vec<AuthRule>, FetchAuthRulesError>> + Send; ) -> impl Future<Output = Result<Vec<AuthRule>, FetchAuthRulesError>> + Send;
} }
@@ -144,7 +144,7 @@ impl JwkCacheEntryLock {
async fn renew_jwks<F: FetchAuthRules>( async fn renew_jwks<F: FetchAuthRules>(
&self, &self,
_permit: JwkRenewalPermit<'_>, _permit: JwkRenewalPermit<'_>,
ctx: &RequestMonitoring, ctx: &RequestContext,
client: &reqwest_middleware::ClientWithMiddleware, client: &reqwest_middleware::ClientWithMiddleware,
endpoint: EndpointId, endpoint: EndpointId,
auth_rules: &F, auth_rules: &F,
@@ -261,7 +261,7 @@ impl JwkCacheEntryLock {
async fn get_or_update_jwk_cache<F: FetchAuthRules>( async fn get_or_update_jwk_cache<F: FetchAuthRules>(
self: &Arc<Self>, self: &Arc<Self>,
ctx: &RequestMonitoring, ctx: &RequestContext,
client: &reqwest_middleware::ClientWithMiddleware, client: &reqwest_middleware::ClientWithMiddleware,
endpoint: EndpointId, endpoint: EndpointId,
fetch: &F, fetch: &F,
@@ -314,7 +314,7 @@ impl JwkCacheEntryLock {
async fn check_jwt<F: FetchAuthRules>( async fn check_jwt<F: FetchAuthRules>(
self: &Arc<Self>, self: &Arc<Self>,
ctx: &RequestMonitoring, ctx: &RequestContext,
jwt: &str, jwt: &str,
client: &reqwest_middleware::ClientWithMiddleware, client: &reqwest_middleware::ClientWithMiddleware,
endpoint: EndpointId, endpoint: EndpointId,
@@ -409,7 +409,7 @@ impl JwkCacheEntryLock {
impl JwkCache { impl JwkCache {
pub(crate) async fn check_jwt<F: FetchAuthRules>( pub(crate) async fn check_jwt<F: FetchAuthRules>(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
role_name: &RoleName, role_name: &RoleName,
fetch: &F, fetch: &F,
@@ -941,7 +941,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
impl FetchAuthRules for Fetch { impl FetchAuthRules for Fetch {
async fn fetch_auth_rules( async fn fetch_auth_rules(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_endpoint: EndpointId, _endpoint: EndpointId,
) -> Result<Vec<AuthRule>, FetchAuthRulesError> { ) -> Result<Vec<AuthRule>, FetchAuthRulesError> {
Ok(self.0.clone()) Ok(self.0.clone())
@@ -1039,7 +1039,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
for token in &tokens { for token in &tokens {
jwk_cache jwk_cache
.check_jwt( .check_jwt(
&RequestMonitoring::test(), &RequestContext::test(),
endpoint.clone(), endpoint.clone(),
role, role,
&fetch, &fetch,
@@ -1097,7 +1097,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
jwk_cache jwk_cache
.check_jwt( .check_jwt(
&RequestMonitoring::test(), &RequestContext::test(),
endpoint.clone(), endpoint.clone(),
&role_name, &role_name,
&fetch, &fetch,
@@ -1136,7 +1136,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
let ep = EndpointId::from("ep"); let ep = EndpointId::from("ep");
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let err = jwk_cache let err = jwk_cache
.check_jwt(&ctx, ep, &role, &fetch, &bad_jwt) .check_jwt(&ctx, ep, &role, &fetch, &bad_jwt)
.await .await
@@ -1175,7 +1175,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
// this role_name is not accepted // this role_name is not accepted
let bad_role_name = RoleName::from("cloud_admin"); let bad_role_name = RoleName::from("cloud_admin");
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let err = jwk_cache let err = jwk_cache
.check_jwt(&ctx, ep, &bad_role_name, &fetch, &jwt) .check_jwt(&ctx, ep, &bad_role_name, &fetch, &jwt)
.await .await
@@ -1268,7 +1268,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
let ep = EndpointId::from("ep"); let ep = EndpointId::from("ep");
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
for test in table { for test in table {
let jwt = new_custom_ec_jwt("1".into(), &key, test.body); let jwt = new_custom_ec_jwt("1".into(), &key, test.body);
@@ -1336,7 +1336,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
jwk_cache jwk_cache
.check_jwt( .check_jwt(
&RequestMonitoring::test(), &RequestContext::test(),
endpoint.clone(), endpoint.clone(),
&role_name, &role_name,
&fetch, &fetch,

View File

@@ -7,7 +7,7 @@ use super::jwt::{AuthRule, FetchAuthRules};
use crate::auth::backend::jwt::FetchAuthRulesError; use crate::auth::backend::jwt::FetchAuthRulesError;
use crate::compute::ConnCfg; use crate::compute::ConnCfg;
use crate::compute_ctl::ComputeCtlApi; use crate::compute_ctl::ComputeCtlApi;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}; use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo};
use crate::control_plane::NodeInfo; use crate::control_plane::NodeInfo;
use crate::http; use crate::http;
@@ -56,7 +56,7 @@ pub static JWKS_ROLE_MAP: ArcSwapOption<EndpointJwksResponse> = ArcSwapOption::c
impl FetchAuthRules for StaticAuthRules { impl FetchAuthRules for StaticAuthRules {
async fn fetch_auth_rules( async fn fetch_auth_rules(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_endpoint: EndpointId, _endpoint: EndpointId,
) -> Result<Vec<AuthRule>, FetchAuthRulesError> { ) -> Result<Vec<AuthRule>, FetchAuthRulesError> {
let mappings = JWKS_ROLE_MAP.load(); let mappings = JWKS_ROLE_MAP.load();

View File

@@ -20,7 +20,7 @@ use crate::auth::credentials::check_peer_addr_is_in_list;
use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint}; use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint};
use crate::cache::Cached; use crate::cache::Cached;
use crate::config::AuthenticationConfig; use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::client::ControlPlaneClient; use crate::control_plane::client::ControlPlaneClient;
use crate::control_plane::errors::GetAuthInfoError; use crate::control_plane::errors::GetAuthInfoError;
use crate::control_plane::{ use crate::control_plane::{
@@ -210,7 +210,7 @@ impl RateBucketInfo {
impl AuthenticationConfig { impl AuthenticationConfig {
pub(crate) fn check_rate_limit( pub(crate) fn check_rate_limit(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
secret: AuthSecret, secret: AuthSecret,
endpoint: &EndpointId, endpoint: &EndpointId,
is_cleartext: bool, is_cleartext: bool,
@@ -265,7 +265,7 @@ impl AuthenticationConfig {
/// ///
/// All authentication flows will emit an AuthenticationOk message if successful. /// All authentication flows will emit an AuthenticationOk message if successful.
async fn auth_quirks( async fn auth_quirks(
ctx: &RequestMonitoring, ctx: &RequestContext,
api: &impl control_plane::ControlPlaneApi, api: &impl control_plane::ControlPlaneApi,
user_info: ComputeUserInfoMaybeEndpoint, user_info: ComputeUserInfoMaybeEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>, client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
@@ -343,7 +343,7 @@ async fn auth_quirks(
} }
async fn authenticate_with_secret( async fn authenticate_with_secret(
ctx: &RequestMonitoring, ctx: &RequestContext,
secret: AuthSecret, secret: AuthSecret,
info: ComputeUserInfo, info: ComputeUserInfo,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>, client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
@@ -396,7 +396,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)] #[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub(crate) async fn authenticate( pub(crate) async fn authenticate(
self, self,
ctx: &RequestMonitoring, ctx: &RequestContext,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>, client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
allow_cleartext: bool, allow_cleartext: bool,
config: &'static AuthenticationConfig, config: &'static AuthenticationConfig,
@@ -436,7 +436,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> {
impl Backend<'_, ComputeUserInfo> { impl Backend<'_, ComputeUserInfo> {
pub(crate) async fn get_role_secret( pub(crate) async fn get_role_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
) -> Result<CachedRoleSecret, GetAuthInfoError> { ) -> Result<CachedRoleSecret, GetAuthInfoError> {
match self { match self {
Self::ControlPlane(api, user_info) => api.get_role_secret(ctx, user_info).await, Self::ControlPlane(api, user_info) => api.get_role_secret(ctx, user_info).await,
@@ -446,7 +446,7 @@ impl Backend<'_, ComputeUserInfo> {
pub(crate) async fn get_allowed_ips_and_secret( pub(crate) async fn get_allowed_ips_and_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> { ) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
match self { match self {
Self::ControlPlane(api, user_info) => { Self::ControlPlane(api, user_info) => {
@@ -461,7 +461,7 @@ impl Backend<'_, ComputeUserInfo> {
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> { impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
async fn wake_compute( async fn wake_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> { ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
match self { match self {
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await, Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
@@ -497,7 +497,7 @@ mod tests {
use crate::auth::backend::MaskedIp; use crate::auth::backend::MaskedIp;
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern}; use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
use crate::config::AuthenticationConfig; use crate::config::AuthenticationConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::{self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret}; use crate::control_plane::{self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret};
use crate::proxy::NeonOptions; use crate::proxy::NeonOptions;
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo}; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo};
@@ -513,7 +513,7 @@ mod tests {
impl control_plane::ControlPlaneApi for Auth { impl control_plane::ControlPlaneApi for Auth {
async fn get_role_secret( async fn get_role_secret(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_user_info: &super::ComputeUserInfo, _user_info: &super::ComputeUserInfo,
) -> Result<CachedRoleSecret, control_plane::errors::GetAuthInfoError> { ) -> Result<CachedRoleSecret, control_plane::errors::GetAuthInfoError> {
Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone()))) Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone())))
@@ -521,7 +521,7 @@ mod tests {
async fn get_allowed_ips_and_secret( async fn get_allowed_ips_and_secret(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_user_info: &super::ComputeUserInfo, _user_info: &super::ComputeUserInfo,
) -> Result< ) -> Result<
(CachedAllowedIps, Option<CachedRoleSecret>), (CachedAllowedIps, Option<CachedRoleSecret>),
@@ -535,7 +535,7 @@ mod tests {
async fn get_endpoint_jwks( async fn get_endpoint_jwks(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_endpoint: crate::types::EndpointId, _endpoint: crate::types::EndpointId,
) -> Result<Vec<super::jwt::AuthRule>, control_plane::errors::GetEndpointJwksError> ) -> Result<Vec<super::jwt::AuthRule>, control_plane::errors::GetEndpointJwksError>
{ {
@@ -544,7 +544,7 @@ mod tests {
async fn wake_compute( async fn wake_compute(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_user_info: &super::ComputeUserInfo, _user_info: &super::ComputeUserInfo,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> { ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
unimplemented!() unimplemented!()
@@ -623,7 +623,7 @@ mod tests {
let (mut client, server) = tokio::io::duplex(1024); let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server)); let mut stream = PqStream::new(Stream::from_raw(server));
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let api = Auth { let api = Auth {
ips: vec![], ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
@@ -700,7 +700,7 @@ mod tests {
let (mut client, server) = tokio::io::duplex(1024); let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server)); let mut stream = PqStream::new(Stream::from_raw(server));
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let api = Auth { let api = Auth {
ips: vec![], ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
@@ -752,7 +752,7 @@ mod tests {
let (mut client, server) = tokio::io::duplex(1024); let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server)); let mut stream = PqStream::new(Stream::from_raw(server));
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let api = Auth { let api = Auth {
ips: vec![], ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),

View File

@@ -10,7 +10,7 @@ use thiserror::Error;
use tracing::{debug, warn}; use tracing::{debug, warn};
use crate::auth::password_hack::parse_endpoint_param; use crate::auth::password_hack::parse_endpoint_param;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError}; use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, SniKind}; use crate::metrics::{Metrics, SniKind};
use crate::proxy::NeonOptions; use crate::proxy::NeonOptions;
@@ -86,7 +86,7 @@ pub(crate) fn endpoint_sni(
impl ComputeUserInfoMaybeEndpoint { impl ComputeUserInfoMaybeEndpoint {
pub(crate) fn parse( pub(crate) fn parse(
ctx: &RequestMonitoring, ctx: &RequestContext,
params: &StartupMessageParams, params: &StartupMessageParams,
sni: Option<&str>, sni: Option<&str>,
common_names: Option<&HashSet<String>>, common_names: Option<&HashSet<String>>,
@@ -260,7 +260,7 @@ mod tests {
fn parse_bare_minimum() -> anyhow::Result<()> { fn parse_bare_minimum() -> anyhow::Result<()> {
// According to postgresql, only `user` should be required. // According to postgresql, only `user` should be required.
let options = StartupMessageParams::new([("user", "john_doe")]); let options = StartupMessageParams::new([("user", "john_doe")]);
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id, None); assert_eq!(user_info.endpoint_id, None);
@@ -275,7 +275,7 @@ mod tests {
("database", "world"), // should be ignored ("database", "world"), // should be ignored
("foo", "bar"), // should be ignored ("foo", "bar"), // should be ignored
]); ]);
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id, None); assert_eq!(user_info.endpoint_id, None);
@@ -290,7 +290,7 @@ mod tests {
let sni = Some("foo.localhost"); let sni = Some("foo.localhost");
let common_names = Some(["localhost".into()].into()); let common_names = Some(["localhost".into()].into());
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = let user_info =
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
@@ -307,7 +307,7 @@ mod tests {
("options", "-ckey=1 project=bar -c geqo=off"), ("options", "-ckey=1 project=bar -c geqo=off"),
]); ]);
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("bar")); assert_eq!(user_info.endpoint_id.as_deref(), Some("bar"));
@@ -322,7 +322,7 @@ mod tests {
("options", "-ckey=1 endpoint=bar -c geqo=off"), ("options", "-ckey=1 endpoint=bar -c geqo=off"),
]); ]);
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("bar")); assert_eq!(user_info.endpoint_id.as_deref(), Some("bar"));
@@ -340,7 +340,7 @@ mod tests {
), ),
]); ]);
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
assert!(user_info.endpoint_id.is_none()); assert!(user_info.endpoint_id.is_none());
@@ -355,7 +355,7 @@ mod tests {
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"), ("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
]); ]);
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
assert!(user_info.endpoint_id.is_none()); assert!(user_info.endpoint_id.is_none());
@@ -370,7 +370,7 @@ mod tests {
let sni = Some("baz.localhost"); let sni = Some("baz.localhost");
let common_names = Some(["localhost".into()].into()); let common_names = Some(["localhost".into()].into());
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = let user_info =
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.user, "john_doe");
@@ -385,14 +385,14 @@ mod tests {
let common_names = Some(["a.com".into(), "b.com".into()].into()); let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.a.com"); let sni = Some("p1.a.com");
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = let user_info =
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("p1")); assert_eq!(user_info.endpoint_id.as_deref(), Some("p1"));
let common_names = Some(["a.com".into(), "b.com".into()].into()); let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.b.com"); let sni = Some("p1.b.com");
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = let user_info =
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("p1")); assert_eq!(user_info.endpoint_id.as_deref(), Some("p1"));
@@ -408,7 +408,7 @@ mod tests {
let sni = Some("second.localhost"); let sni = Some("second.localhost");
let common_names = Some(["localhost".into()].into()); let common_names = Some(["localhost".into()].into());
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref()) let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail"); .expect_err("should fail");
match err { match err {
@@ -427,7 +427,7 @@ mod tests {
let sni = Some("project.localhost"); let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into()); let common_names = Some(["example.com".into()].into());
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref()) let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail"); .expect_err("should fail");
match err { match err {
@@ -447,7 +447,7 @@ mod tests {
let sni = Some("project.localhost"); let sni = Some("project.localhost");
let common_names = Some(["localhost".into()].into()); let common_names = Some(["localhost".into()].into());
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let user_info = let user_info =
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("project")); assert_eq!(user_info.endpoint_id.as_deref(), Some("project"));

View File

@@ -11,7 +11,7 @@ use tracing::info;
use super::backend::ComputeCredentialKeys; use super::backend::ComputeCredentialKeys;
use super::{AuthError, PasswordHackPayload}; use super::{AuthError, PasswordHackPayload};
use crate::config::TlsServerEndPoint; use crate::config::TlsServerEndPoint;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::AuthSecret; use crate::control_plane::AuthSecret;
use crate::intern::EndpointIdInt; use crate::intern::EndpointIdInt;
use crate::sasl; use crate::sasl;
@@ -32,7 +32,7 @@ pub(crate) struct Begin;
/// Use [SCRAM](crate::scram)-based auth in [`AuthFlow`]. /// Use [SCRAM](crate::scram)-based auth in [`AuthFlow`].
pub(crate) struct Scram<'a>( pub(crate) struct Scram<'a>(
pub(crate) &'a scram::ServerSecret, pub(crate) &'a scram::ServerSecret,
pub(crate) &'a RequestMonitoring, pub(crate) &'a RequestContext,
); );
impl AuthMethod for Scram<'_> { impl AuthMethod for Scram<'_> {

View File

@@ -11,7 +11,7 @@ use futures::future::Either;
use futures::TryFutureExt; use futures::TryFutureExt;
use itertools::Itertools; use itertools::Itertools;
use proxy::config::TlsServerEndPoint; use proxy::config::TlsServerEndPoint;
use proxy::context::RequestMonitoring; use proxy::context::RequestContext;
use proxy::metrics::{Metrics, ThreadPoolMetrics}; use proxy::metrics::{Metrics, ThreadPoolMetrics};
use proxy::protocol2::ConnectionInfo; use proxy::protocol2::ConnectionInfo;
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
@@ -177,7 +177,7 @@ async fn task_main(
.context("failed to set socket option")?; .context("failed to set socket option")?;
info!(%peer_addr, "serving"); info!(%peer_addr, "serving");
let ctx = RequestMonitoring::new( let ctx = RequestContext::new(
session_id, session_id,
ConnectionInfo { ConnectionInfo {
addr: peer_addr, addr: peer_addr,
@@ -208,7 +208,7 @@ async fn task_main(
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>( async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
ctx: &RequestMonitoring, ctx: &RequestContext,
raw_stream: S, raw_stream: S,
tls_config: Arc<rustls::ServerConfig>, tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint, tls_server_end_point: TlsServerEndPoint,
@@ -259,7 +259,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
} }
async fn handle_client( async fn handle_client(
ctx: RequestMonitoring, ctx: RequestContext,
dest_suffix: Arc<String>, dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>, tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint, tls_server_end_point: TlsServerEndPoint,

View File

@@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
use tracing::info; use tracing::info;
use crate::config::EndpointCacheConfig; use crate::config::EndpointCacheConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}; use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
use crate::metrics::{Metrics, RedisErrors, RedisEventsCount}; use crate::metrics::{Metrics, RedisErrors, RedisEventsCount};
use crate::rate_limiter::GlobalRateLimiter; use crate::rate_limiter::GlobalRateLimiter;
@@ -75,7 +75,7 @@ impl EndpointsCache {
} }
} }
pub(crate) fn is_valid(&self, ctx: &RequestMonitoring, endpoint: &EndpointId) -> bool { pub(crate) fn is_valid(&self, ctx: &RequestContext, endpoint: &EndpointId) -> bool {
if !self.ready.load(Ordering::Acquire) { if !self.ready.load(Ordering::Acquire) {
// the endpoint cache is not yet fully initialised. // the endpoint cache is not yet fully initialised.
return true; return true;

View File

@@ -18,7 +18,7 @@ use tracing::{debug, error, info, warn};
use crate::auth::parse_endpoint_param; use crate::auth::parse_endpoint_param;
use crate::cancellation::CancelClosure; use crate::cancellation::CancelClosure;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError; use crate::control_plane::client::ApiLockError;
use crate::control_plane::errors::WakeComputeError; use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::messages::MetricsAuxInfo; use crate::control_plane::messages::MetricsAuxInfo;
@@ -286,7 +286,7 @@ impl ConnCfg {
/// Connect to a corresponding compute node. /// Connect to a corresponding compute node.
pub(crate) async fn connect( pub(crate) async fn connect(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
allow_self_signed_compute: bool, allow_self_signed_compute: bool,
aux: MetricsAuxInfo, aux: MetricsAuxInfo,
timeout: Duration, timeout: Duration,

View File

@@ -8,7 +8,7 @@ use tracing::{debug, error, info, Instrument};
use crate::auth::backend::ConsoleRedirectBackend; use crate::auth::backend::ConsoleRedirectBackend;
use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal}; use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal};
use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::error::ReportableError; use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard}; use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo}; use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo};
@@ -82,7 +82,7 @@ pub async fn task_main(
} }
}; };
let ctx = RequestMonitoring::new( let ctx = RequestContext::new(
session_id, session_id,
peer_addr, peer_addr,
crate::metrics::Protocol::Tcp, crate::metrics::Protocol::Tcp,
@@ -141,7 +141,7 @@ pub async fn task_main(
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>( pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig, config: &'static ProxyConfig,
backend: &'static ConsoleRedirectBackend, backend: &'static ConsoleRedirectBackend,
ctx: &RequestMonitoring, ctx: &RequestContext,
cancellation_handler: Arc<CancellationHandlerMain>, cancellation_handler: Arc<CancellationHandlerMain>,
stream: S, stream: S,
conn_gauge: NumClientConnectionsGuard<'static>, conn_gauge: NumClientConnectionsGuard<'static>,

View File

@@ -32,15 +32,15 @@ pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<Reques
/// ///
/// This data should **not** be used for connection logic, only for observability and limiting purposes. /// This data should **not** be used for connection logic, only for observability and limiting purposes.
/// All connection logic should instead use strongly typed state machines, not a bunch of Options. /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
pub struct RequestMonitoring( pub struct RequestContext(
/// To allow easier use of the ctx object, we have interior mutability. /// To allow easier use of the ctx object, we have interior mutability.
/// I would typically use a RefCell but that would break the `Send` requirements /// I would typically use a RefCell but that would break the `Send` requirements
/// so we need something with thread-safety. `TryLock` is a cheap alternative /// so we need something with thread-safety. `TryLock` is a cheap alternative
/// that offers similar semantics to a `RefCell` but with synchronisation. /// that offers similar semantics to a `RefCell` but with synchronisation.
TryLock<RequestMonitoringInner>, TryLock<RequestContextInner>,
); );
struct RequestMonitoringInner { struct RequestContextInner {
pub(crate) conn_info: ConnectionInfo, pub(crate) conn_info: ConnectionInfo,
pub(crate) session_id: Uuid, pub(crate) session_id: Uuid,
pub(crate) protocol: Protocol, pub(crate) protocol: Protocol,
@@ -81,10 +81,10 @@ pub(crate) enum AuthMethod {
Cleartext, Cleartext,
} }
impl Clone for RequestMonitoring { impl Clone for RequestContext {
fn clone(&self) -> Self { fn clone(&self) -> Self {
let inner = self.0.try_lock().expect("should not deadlock"); let inner = self.0.try_lock().expect("should not deadlock");
let new = RequestMonitoringInner { let new = RequestContextInner {
conn_info: inner.conn_info.clone(), conn_info: inner.conn_info.clone(),
session_id: inner.session_id, session_id: inner.session_id,
protocol: inner.protocol, protocol: inner.protocol,
@@ -115,7 +115,7 @@ impl Clone for RequestMonitoring {
} }
} }
impl RequestMonitoring { impl RequestContext {
pub fn new( pub fn new(
session_id: Uuid, session_id: Uuid,
conn_info: ConnectionInfo, conn_info: ConnectionInfo,
@@ -132,7 +132,7 @@ impl RequestMonitoring {
role = tracing::field::Empty, role = tracing::field::Empty,
); );
let inner = RequestMonitoringInner { let inner = RequestContextInner {
conn_info, conn_info,
session_id, session_id,
protocol, protocol,
@@ -168,7 +168,7 @@ impl RequestMonitoring {
let ip = IpAddr::from([127, 0, 0, 1]); let ip = IpAddr::from([127, 0, 0, 1]);
let addr = SocketAddr::new(ip, 5432); let addr = SocketAddr::new(ip, 5432);
let conn_info = ConnectionInfo { addr, extra: None }; let conn_info = ConnectionInfo { addr, extra: None };
RequestMonitoring::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test") RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
} }
pub(crate) fn console_application_name(&self) -> String { pub(crate) fn console_application_name(&self) -> String {
@@ -325,7 +325,7 @@ impl RequestMonitoring {
} }
pub(crate) struct LatencyTimerPause<'a> { pub(crate) struct LatencyTimerPause<'a> {
ctx: &'a RequestMonitoring, ctx: &'a RequestContext,
start: tokio::time::Instant, start: tokio::time::Instant,
waiting_for: Waiting, waiting_for: Waiting,
} }
@@ -341,7 +341,7 @@ impl Drop for LatencyTimerPause<'_> {
} }
} }
impl RequestMonitoringInner { impl RequestContextInner {
fn set_cold_start_info(&mut self, info: ColdStartInfo) { fn set_cold_start_info(&mut self, info: ColdStartInfo) {
self.cold_start_info = info; self.cold_start_info = info;
self.latency_timer.cold_start_info(info); self.latency_timer.cold_start_info(info);
@@ -430,7 +430,7 @@ impl RequestMonitoringInner {
} }
} }
impl Drop for RequestMonitoringInner { impl Drop for RequestContextInner {
fn drop(&mut self) { fn drop(&mut self) {
if self.sender.is_some() { if self.sender.is_some() {
self.log_connect(); self.log_connect();

View File

@@ -20,7 +20,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, Span}; use tracing::{debug, info, Span};
use utils::backoff; use utils::backoff;
use super::{RequestMonitoringInner, LOG_CHAN}; use super::{RequestContextInner, LOG_CHAN};
use crate::config::remote_storage_from_toml; use crate::config::remote_storage_from_toml;
use crate::context::LOG_CHAN_DISCONNECT; use crate::context::LOG_CHAN_DISCONNECT;
@@ -117,8 +117,8 @@ impl serde::Serialize for Options<'_> {
} }
} }
impl From<&RequestMonitoringInner> for RequestData { impl From<&RequestContextInner> for RequestData {
fn from(value: &RequestMonitoringInner) -> Self { fn from(value: &RequestContextInner) -> Self {
Self { Self {
session_id: value.session_id, session_id: value.session_id,
peer_addr: value.conn_info.addr.ip().to_string(), peer_addr: value.conn_info.addr.ip().to_string(),

View File

@@ -13,7 +13,7 @@ use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::ComputeUserInfo;
use crate::auth::IpPattern; use crate::auth::IpPattern;
use crate::cache::Cached; use crate::cache::Cached;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::client::{CachedAllowedIps, CachedRoleSecret}; use crate::control_plane::client::{CachedAllowedIps, CachedRoleSecret};
use crate::control_plane::errors::{ use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
@@ -206,7 +206,7 @@ impl super::ControlPlaneApi for MockControlPlane {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn get_role_secret( async fn get_role_secret(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> { ) -> Result<CachedRoleSecret, GetAuthInfoError> {
Ok(CachedRoleSecret::new_uncached( Ok(CachedRoleSecret::new_uncached(
@@ -216,7 +216,7 @@ impl super::ControlPlaneApi for MockControlPlane {
async fn get_allowed_ips_and_secret( async fn get_allowed_ips_and_secret(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> { ) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
Ok(( Ok((
@@ -229,7 +229,7 @@ impl super::ControlPlaneApi for MockControlPlane {
async fn get_endpoint_jwks( async fn get_endpoint_jwks(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> { ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
self.do_get_endpoint_jwks(endpoint).await self.do_get_endpoint_jwks(endpoint).await
@@ -238,7 +238,7 @@ impl super::ControlPlaneApi for MockControlPlane {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn wake_compute( async fn wake_compute(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_user_info: &ComputeUserInfo, _user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> { ) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute().map_ok(Cached::new_uncached).await self.do_wake_compute().map_ok(Cached::new_uncached).await

View File

@@ -15,7 +15,7 @@ use crate::auth::backend::ComputeUserInfo;
use crate::cache::endpoints::EndpointsCache; use crate::cache::endpoints::EndpointsCache;
use crate::cache::project_info::ProjectInfoCacheImpl; use crate::cache::project_info::ProjectInfoCacheImpl;
use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}; use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::{ use crate::control_plane::{
errors, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache, errors, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache,
}; };
@@ -41,7 +41,7 @@ pub enum ControlPlaneClient {
impl ControlPlaneApi for ControlPlaneClient { impl ControlPlaneApi for ControlPlaneClient {
async fn get_role_secret( async fn get_role_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> { ) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
match self { match self {
@@ -57,7 +57,7 @@ impl ControlPlaneApi for ControlPlaneClient {
async fn get_allowed_ips_and_secret( async fn get_allowed_ips_and_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> { ) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
match self { match self {
@@ -71,7 +71,7 @@ impl ControlPlaneApi for ControlPlaneClient {
async fn get_endpoint_jwks( async fn get_endpoint_jwks(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError> { ) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError> {
match self { match self {
@@ -85,7 +85,7 @@ impl ControlPlaneApi for ControlPlaneClient {
async fn wake_compute( async fn wake_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError> { ) -> Result<CachedNodeInfo, errors::WakeComputeError> {
match self { match self {
@@ -271,7 +271,7 @@ impl WakeComputePermit {
impl FetchAuthRules for ControlPlaneClient { impl FetchAuthRules for ControlPlaneClient {
async fn fetch_auth_rules( async fn fetch_auth_rules(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> Result<Vec<AuthRule>, FetchAuthRulesError> { ) -> Result<Vec<AuthRule>, FetchAuthRulesError> {
self.get_endpoint_jwks(ctx, endpoint) self.get_endpoint_jwks(ctx, endpoint)

View File

@@ -14,7 +14,7 @@ use super::super::messages::{ControlPlaneErrorMessage, GetRoleSecret, WakeComput
use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cached; use crate::cache::Cached;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::caches::ApiCaches; use crate::control_plane::caches::ApiCaches;
use crate::control_plane::errors::{ use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
@@ -65,7 +65,7 @@ impl NeonControlPlaneClient {
async fn do_get_auth_info( async fn do_get_auth_info(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> { ) -> Result<AuthInfo, GetAuthInfoError> {
if !self if !self
@@ -141,7 +141,7 @@ impl NeonControlPlaneClient {
async fn do_get_endpoint_jwks( async fn do_get_endpoint_jwks(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> { ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
if !self if !self
@@ -200,7 +200,7 @@ impl NeonControlPlaneClient {
async fn do_wake_compute( async fn do_wake_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> { ) -> Result<NodeInfo, WakeComputeError> {
let request_id = ctx.session_id().to_string(); let request_id = ctx.session_id().to_string();
@@ -263,7 +263,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn get_role_secret( async fn get_role_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> { ) -> Result<CachedRoleSecret, GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize(); let normalized_ep = &user_info.endpoint.normalize();
@@ -297,7 +297,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
async fn get_allowed_ips_and_secret( async fn get_allowed_ips_and_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> { ) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize(); let normalized_ep = &user_info.endpoint.normalize();
@@ -339,7 +339,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn get_endpoint_jwks( async fn get_endpoint_jwks(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> { ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
self.do_get_endpoint_jwks(ctx, endpoint).await self.do_get_endpoint_jwks(ctx, endpoint).await
@@ -348,7 +348,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn wake_compute( async fn wake_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> { ) -> Result<CachedNodeInfo, WakeComputeError> {
let key = user_info.endpoint_cache_key(); let key = user_info.endpoint_cache_key();

View File

@@ -17,7 +17,7 @@ use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::IpPattern; use crate::auth::IpPattern;
use crate::cache::project_info::ProjectInfoCacheImpl; use crate::cache::project_info::ProjectInfoCacheImpl;
use crate::cache::{Cached, TimedLru}; use crate::cache::{Cached, TimedLru};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo}; use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::intern::ProjectIdInt; use crate::intern::ProjectIdInt;
use crate::types::{EndpointCacheKey, EndpointId}; use crate::types::{EndpointCacheKey, EndpointId};
@@ -75,7 +75,7 @@ pub(crate) struct NodeInfo {
impl NodeInfo { impl NodeInfo {
pub(crate) async fn connect( pub(crate) async fn connect(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
timeout: Duration, timeout: Duration,
) -> Result<compute::PostgresConnection, compute::ConnectionError> { ) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config self.config
@@ -116,26 +116,26 @@ pub(crate) trait ControlPlaneApi {
/// We still have to mock the scram to avoid leaking information that user doesn't exist. /// We still have to mock the scram to avoid leaking information that user doesn't exist.
async fn get_role_secret( async fn get_role_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>; ) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
async fn get_allowed_ips_and_secret( async fn get_allowed_ips_and_secret(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>; ) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
async fn get_endpoint_jwks( async fn get_endpoint_jwks(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
endpoint: EndpointId, endpoint: EndpointId,
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError>; ) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError>;
/// Wake up the compute node and return the corresponding connection info. /// Wake up the compute node and return the corresponding connection info.
async fn wake_compute( async fn wake_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError>; ) -> Result<CachedNodeInfo, errors::WakeComputeError>;
} }

View File

@@ -7,7 +7,7 @@ use super::retry::ShouldRetryWakeCompute;
use crate::auth::backend::ComputeCredentialKeys; use crate::auth::backend::ComputeCredentialKeys;
use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT}; use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
use crate::config::RetryConfig; use crate::config::RetryConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError; use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::locks::ApiLocks; use crate::control_plane::locks::ApiLocks;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
@@ -47,7 +47,7 @@ pub(crate) trait ConnectMechanism {
type Error: From<Self::ConnectError>; type Error: From<Self::ConnectError>;
async fn connect_once( async fn connect_once(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo, node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration, timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError>; ) -> Result<Self::Connection, Self::ConnectError>;
@@ -59,7 +59,7 @@ pub(crate) trait ConnectMechanism {
pub(crate) trait ComputeConnectBackend { pub(crate) trait ComputeConnectBackend {
async fn wake_compute( async fn wake_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>; ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
fn get_keys(&self) -> &ComputeCredentialKeys; fn get_keys(&self) -> &ComputeCredentialKeys;
@@ -82,7 +82,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
async fn connect_once( async fn connect_once(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo, node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration, timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> { ) -> Result<PostgresConnection, Self::Error> {
@@ -99,7 +99,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
/// Try to connect to the compute node, retrying if necessary. /// Try to connect to the compute node, retrying if necessary.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>( pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
ctx: &RequestMonitoring, ctx: &RequestContext,
mechanism: &M, mechanism: &M,
user_info: &B, user_info: &B,
allow_self_signed_compute: bool, allow_self_signed_compute: bool,

View File

@@ -9,7 +9,7 @@ use tracing::{info, warn};
use crate::auth::endpoint_sni; use crate::auth::endpoint_sni;
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL}; use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::error::ReportableError; use crate::error::ReportableError;
use crate::metrics::Metrics; use crate::metrics::Metrics;
use crate::proxy::ERR_INSECURE_CONNECTION; use crate::proxy::ERR_INSECURE_CONNECTION;
@@ -66,7 +66,7 @@ pub(crate) enum HandshakeData<S> {
/// we also take an extra care of propagating only the select handshake errors to client. /// we also take an extra care of propagating only the select handshake errors to client.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>( pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
ctx: &RequestMonitoring, ctx: &RequestContext,
stream: S, stream: S,
mut tls: Option<&TlsConfig>, mut tls: Option<&TlsConfig>,
record_handshake_error: bool, record_handshake_error: bool,

View File

@@ -25,7 +25,7 @@ use self::connect_compute::{connect_to_compute, TcpMechanism};
use self::passthrough::ProxyPassthrough; use self::passthrough::ProxyPassthrough;
use crate::cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal}; use crate::cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::error::ReportableError; use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard}; use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo}; use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo};
@@ -117,7 +117,7 @@ pub async fn task_main(
} }
}; };
let ctx = RequestMonitoring::new( let ctx = RequestContext::new(
session_id, session_id,
conn_info, conn_info,
crate::metrics::Protocol::Tcp, crate::metrics::Protocol::Tcp,
@@ -247,7 +247,7 @@ impl ReportableError for ClientRequestError {
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>( pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig, config: &'static ProxyConfig,
auth_backend: &'static auth::Backend<'static, ()>, auth_backend: &'static auth::Backend<'static, ()>,
ctx: &RequestMonitoring, ctx: &RequestContext,
cancellation_handler: Arc<CancellationHandlerMain>, cancellation_handler: Arc<CancellationHandlerMain>,
stream: S, stream: S,
mode: ClientMode, mode: ClientMode,

View File

@@ -36,7 +36,7 @@ async fn proxy_mitm(
// begin handshake with end_server // begin handshake with end_server
let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await; let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await;
let (end_client, startup) = match handshake( let (end_client, startup) = match handshake(
&RequestMonitoring::test(), &RequestContext::test(),
client1, client1,
Some(&server_config1), Some(&server_config1),
false, false,

View File

@@ -162,7 +162,7 @@ impl TestAuth for Scram {
stream: &mut PqStream<Stream<S>>, stream: &mut PqStream<Stream<S>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let outcome = auth::AuthFlow::new(stream) let outcome = auth::AuthFlow::new(stream)
.begin(auth::Scram(&self.0, &RequestMonitoring::test())) .begin(auth::Scram(&self.0, &RequestContext::test()))
.await? .await?
.authenticate() .authenticate()
.await?; .await?;
@@ -182,11 +182,10 @@ async fn dummy_proxy(
auth: impl TestAuth + Send, auth: impl TestAuth + Send,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let (client, _) = read_proxy_protocol(client).await?; let (client, _) = read_proxy_protocol(client).await?;
let mut stream = let mut stream = match handshake(&RequestContext::test(), client, tls.as_ref(), false).await? {
match handshake(&RequestMonitoring::test(), client, tls.as_ref(), false).await? { HandshakeData::Startup(stream, _) => stream,
HandshakeData::Startup(stream, _) => stream, HandshakeData::Cancel(_) => bail!("cancellation not supported"),
HandshakeData::Cancel(_) => bail!("cancellation not supported"), };
};
auth.authenticate(&mut stream).await?; auth.authenticate(&mut stream).await?;
@@ -466,7 +465,7 @@ impl ConnectMechanism for TestConnectMechanism {
async fn connect_once( async fn connect_once(
&self, &self,
_ctx: &RequestMonitoring, _ctx: &RequestContext,
_node_info: &control_plane::CachedNodeInfo, _node_info: &control_plane::CachedNodeInfo,
_timeout: std::time::Duration, _timeout: std::time::Duration,
) -> Result<Self::Connection, Self::ConnectError> { ) -> Result<Self::Connection, Self::ConnectError> {
@@ -581,7 +580,7 @@ fn helper_create_connect_info(
async fn connect_to_compute_success() { async fn connect_to_compute_success() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]); let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig { let config = RetryConfig {
@@ -599,7 +598,7 @@ async fn connect_to_compute_success() {
async fn connect_to_compute_retry() { async fn connect_to_compute_retry() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig { let config = RetryConfig {
@@ -618,7 +617,7 @@ async fn connect_to_compute_retry() {
async fn connect_to_compute_non_retry_1() { async fn connect_to_compute_non_retry_1() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig { let config = RetryConfig {
@@ -637,7 +636,7 @@ async fn connect_to_compute_non_retry_1() {
async fn connect_to_compute_non_retry_2() { async fn connect_to_compute_non_retry_2() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]); let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig { let config = RetryConfig {
@@ -657,7 +656,7 @@ async fn connect_to_compute_non_retry_3() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
tokio::time::pause(); tokio::time::pause();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = let mechanism =
TestConnectMechanism::new(vec![Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry]); TestConnectMechanism::new(vec![Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
@@ -689,7 +688,7 @@ async fn connect_to_compute_non_retry_3() {
async fn wake_retry() { async fn wake_retry() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]); let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig { let config = RetryConfig {
@@ -708,7 +707,7 @@ async fn wake_retry() {
async fn wake_non_retry() { async fn wake_non_retry() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
use ConnectAction::*; use ConnectAction::*;
let ctx = RequestMonitoring::test(); let ctx = RequestContext::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]); let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism); let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig { let config = RetryConfig {

View File

@@ -2,7 +2,7 @@ use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend; use super::connect_compute::ComputeConnectBackend;
use crate::config::RetryConfig; use crate::config::RetryConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError; use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::CachedNodeInfo; use crate::control_plane::CachedNodeInfo;
use crate::error::ReportableError; use crate::error::ReportableError;
@@ -13,7 +13,7 @@ use crate::proxy::retry::{retry_after, should_retry};
pub(crate) async fn wake_compute<B: ComputeConnectBackend>( pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
num_retries: &mut u32, num_retries: &mut u32,
ctx: &RequestMonitoring, ctx: &RequestContext,
api: &B, api: &B,
config: RetryConfig, config: RetryConfig,
) -> Result<CachedNodeInfo, WakeComputeError> { ) -> Result<CachedNodeInfo, WakeComputeError> {

View File

@@ -23,7 +23,7 @@ use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest, ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
}; };
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::client::ApiLockError; use crate::control_plane::client::ApiLockError;
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError}; use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
use crate::control_plane::locks::ApiLocks; use crate::control_plane::locks::ApiLocks;
@@ -48,7 +48,7 @@ pub(crate) struct PoolingBackend {
impl PoolingBackend { impl PoolingBackend {
pub(crate) async fn authenticate_with_password( pub(crate) async fn authenticate_with_password(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
password: &[u8], password: &[u8],
) -> Result<ComputeCredentials, AuthError> { ) -> Result<ComputeCredentials, AuthError> {
@@ -110,7 +110,7 @@ impl PoolingBackend {
pub(crate) async fn authenticate_with_jwt( pub(crate) async fn authenticate_with_jwt(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
user_info: &ComputeUserInfo, user_info: &ComputeUserInfo,
jwt: String, jwt: String,
) -> Result<ComputeCredentials, AuthError> { ) -> Result<ComputeCredentials, AuthError> {
@@ -161,7 +161,7 @@ impl PoolingBackend {
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub(crate) async fn connect_to_compute( pub(crate) async fn connect_to_compute(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: ConnInfo, conn_info: ConnInfo,
keys: ComputeCredentials, keys: ComputeCredentials,
force_new: bool, force_new: bool,
@@ -201,7 +201,7 @@ impl PoolingBackend {
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub(crate) async fn connect_to_local_proxy( pub(crate) async fn connect_to_local_proxy(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: ConnInfo, conn_info: ConnInfo,
) -> Result<http_conn_pool::Client<Send>, HttpConnError> { ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
info!("pool: looking for an existing connection"); info!("pool: looking for an existing connection");
@@ -249,7 +249,7 @@ impl PoolingBackend {
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub(crate) async fn connect_to_local_postgres( pub(crate) async fn connect_to_local_postgres(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: ConnInfo, conn_info: ConnInfo,
) -> Result<Client<tokio_postgres::Client>, HttpConnError> { ) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
if let Some(client) = self.local_pool.get(ctx, &conn_info)? { if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
@@ -490,7 +490,7 @@ impl ConnectMechanism for TokioMechanism {
async fn connect_once( async fn connect_once(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
node_info: &CachedNodeInfo, node_info: &CachedNodeInfo,
timeout: Duration, timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> { ) -> Result<Self::Connection, Self::ConnectError> {
@@ -540,7 +540,7 @@ impl ConnectMechanism for HyperMechanism {
async fn connect_once( async fn connect_once(
&self, &self,
ctx: &RequestMonitoring, ctx: &RequestContext,
node_info: &CachedNodeInfo, node_info: &CachedNodeInfo,
timeout: Duration, timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> { ) -> Result<Self::Connection, Self::ConnectError> {

View File

@@ -21,7 +21,7 @@ use {
use super::conn_pool_lib::{ use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool, Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool,
}; };
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo; use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::Metrics; use crate::metrics::Metrics;
@@ -53,7 +53,7 @@ impl fmt::Display for ConnInfo {
pub(crate) fn poll_client<C: ClientInnerExt>( pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C>>, global_pool: Arc<GlobalConnPool<C>>,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: ConnInfo, conn_info: ConnInfo,
client: C, client: C,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>, mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,

View File

@@ -15,7 +15,7 @@ use super::conn_pool::ClientDataRemote;
use super::http_conn_pool::ClientDataHttp; use super::http_conn_pool::ClientDataHttp;
use super::local_conn_pool::ClientDataLocal; use super::local_conn_pool::ClientDataLocal;
use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::ComputeUserInfo;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::types::{DbName, EndpointCacheKey, RoleName}; use crate::types::{DbName, EndpointCacheKey, RoleName};
@@ -380,7 +380,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
pub(crate) fn get( pub(crate) fn get(
self: &Arc<Self>, self: &Arc<Self>,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: &ConnInfo, conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> { ) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None; let mut client: Option<ClientInnerCommon<C>> = None;

View File

@@ -12,7 +12,7 @@ use tracing::{debug, error, info, info_span, Instrument};
use super::backend::HttpConnError; use super::backend::HttpConnError;
use super::conn_pool_lib::{ClientInnerExt, ConnInfo}; use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::types::EndpointCacheKey; use crate::types::EndpointCacheKey;
@@ -212,7 +212,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
#[expect(unused_results)] #[expect(unused_results)]
pub(crate) fn get( pub(crate) fn get(
self: &Arc<Self>, self: &Arc<Self>,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: &ConnInfo, conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> { ) -> Result<Option<Client<C>>, HttpConnError> {
let result: Result<Option<Client<C>>, HttpConnError>; let result: Result<Option<Client<C>>, HttpConnError>;
@@ -280,7 +280,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
pub(crate) fn poll_http2_client( pub(crate) fn poll_http2_client(
global_pool: Arc<GlobalConnPool<Send>>, global_pool: Arc<GlobalConnPool<Send>>,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: &ConnInfo, conn_info: &ConnInfo,
client: Send, client: Send,
connection: Connect, connection: Connect,

View File

@@ -36,7 +36,7 @@ use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, DbUserConn, Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, DbUserConn,
EndpointConnPool, EndpointConnPool,
}; };
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics; use crate::metrics::Metrics;
@@ -88,7 +88,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
pub(crate) fn get( pub(crate) fn get(
self: &Arc<Self>, self: &Arc<Self>,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: &ConnInfo, conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> { ) -> Result<Option<Client<C>>, HttpConnError> {
let client = self let client = self
@@ -159,7 +159,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) fn poll_client<C: ClientInnerExt>( pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<LocalConnPool<C>>, global_pool: Arc<LocalConnPool<C>>,
ctx: &RequestMonitoring, ctx: &RequestContext,
conn_info: ConnInfo, conn_info: ConnInfo,
client: C, client: C,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>, mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,

View File

@@ -45,7 +45,7 @@ use utils::http::error::ApiError;
use crate::cancellation::CancellationHandlerMain; use crate::cancellation::CancellationHandlerMain;
use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::metrics::Metrics; use crate::metrics::Metrics;
use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo}; use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo};
use crate::proxy::run_until_cancelled; use crate::proxy::run_until_cancelled;
@@ -423,7 +423,7 @@ async fn request_handler(
if config.http_config.accept_websockets if config.http_config.accept_websockets
&& framed_websockets::upgrade::is_upgrade_request(&request) && framed_websockets::upgrade::is_upgrade_request(&request)
{ {
let ctx = RequestMonitoring::new( let ctx = RequestContext::new(
session_id, session_id,
conn_info, conn_info,
crate::metrics::Protocol::Ws, crate::metrics::Protocol::Ws,
@@ -458,7 +458,7 @@ async fn request_handler(
// Return the response so the spawned future can continue. // Return the response so the spawned future can continue.
Ok(response.map(|b| b.map_err(|x| match x {}).boxed())) Ok(response.map(|b| b.map_err(|x| match x {}).boxed()))
} else if request.uri().path() == "/sql" && *request.method() == Method::POST { } else if request.uri().path() == "/sql" && *request.method() == Method::POST {
let ctx = RequestMonitoring::new( let ctx = RequestContext::new(
session_id, session_id,
conn_info, conn_info,
crate::metrics::Protocol::Http, crate::metrics::Protocol::Http,

View File

@@ -34,7 +34,7 @@ use super::json::{json_to_pg_text, pg_text_row_to_json, JsonConversionError};
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; use crate::auth::{endpoint_sni, ComputeUserInfoParseError};
use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::{HttpDirection, Metrics}; use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{run_until_cancelled, NeonOptions}; use crate::proxy::{run_until_cancelled, NeonOptions};
@@ -133,7 +133,7 @@ impl UserFacingError for ConnInfoError {
fn get_conn_info( fn get_conn_info(
config: &'static AuthenticationConfig, config: &'static AuthenticationConfig,
ctx: &RequestMonitoring, ctx: &RequestContext,
headers: &HeaderMap, headers: &HeaderMap,
tls: Option<&TlsConfig>, tls: Option<&TlsConfig>,
) -> Result<ConnInfoWithAuth, ConnInfoError> { ) -> Result<ConnInfoWithAuth, ConnInfoError> {
@@ -240,7 +240,7 @@ fn get_conn_info(
pub(crate) async fn handle( pub(crate) async fn handle(
config: &'static ProxyConfig, config: &'static ProxyConfig,
ctx: RequestMonitoring, ctx: RequestContext,
request: Request<Incoming>, request: Request<Incoming>,
backend: Arc<PoolingBackend>, backend: Arc<PoolingBackend>,
cancel: CancellationToken, cancel: CancellationToken,
@@ -516,7 +516,7 @@ fn map_isolation_level_to_headers(level: IsolationLevel) -> Option<HeaderValue>
async fn handle_inner( async fn handle_inner(
cancel: CancellationToken, cancel: CancellationToken,
config: &'static ProxyConfig, config: &'static ProxyConfig,
ctx: &RequestMonitoring, ctx: &RequestContext,
request: Request<Incoming>, request: Request<Incoming>,
backend: Arc<PoolingBackend>, backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, SqlOverHttpError> {
@@ -562,7 +562,7 @@ async fn handle_inner(
async fn handle_db_inner( async fn handle_db_inner(
cancel: CancellationToken, cancel: CancellationToken,
config: &'static ProxyConfig, config: &'static ProxyConfig,
ctx: &RequestMonitoring, ctx: &RequestContext,
request: Request<Incoming>, request: Request<Incoming>,
conn_info: ConnInfo, conn_info: ConnInfo,
auth: AuthData, auth: AuthData,
@@ -733,7 +733,7 @@ pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue {
} }
async fn handle_auth_broker_inner( async fn handle_auth_broker_inner(
ctx: &RequestMonitoring, ctx: &RequestContext,
request: Request<Incoming>, request: Request<Incoming>,
conn_info: ConnInfo, conn_info: ConnInfo,
jwt: String, jwt: String,

View File

@@ -14,7 +14,7 @@ use tracing::warn;
use crate::cancellation::CancellationHandlerMain; use crate::cancellation::CancellationHandlerMain;
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::context::RequestMonitoring; use crate::context::RequestContext;
use crate::error::{io_error, ReportableError}; use crate::error::{io_error, ReportableError};
use crate::metrics::Metrics; use crate::metrics::Metrics;
use crate::proxy::{handle_client, ClientMode, ErrorSource}; use crate::proxy::{handle_client, ClientMode, ErrorSource};
@@ -126,7 +126,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
pub(crate) async fn serve_websocket( pub(crate) async fn serve_websocket(
config: &'static ProxyConfig, config: &'static ProxyConfig,
auth_backend: &'static crate::auth::Backend<'static, ()>, auth_backend: &'static crate::auth::Backend<'static, ()>,
ctx: RequestMonitoring, ctx: RequestContext,
websocket: OnUpgrade, websocket: OnUpgrade,
cancellation_handler: Arc<CancellationHandlerMain>, cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>, endpoint_rate_limiter: Arc<EndpointRateLimiter>,