mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
proxy: random changes (#8602)
## Problem 1. Hard to correlate startup parameters with the endpoint that provided them. 2. Some configurations are not needed in the `ProxyConfig` struct. ## Summary of changes Because of some borrow checker fun, I needed to switch to an interior-mutability implementation of our `RequestMonitoring` context system. Using https://docs.rs/try-lock/latest/try_lock/ as a cheap lock for such a use-case (needed to be thread safe). Removed the lock of each startup message, instead just logging only the startup params in a successful handshake. Also removed from values from `ProxyConfig` and kept as arguments. (needed for local-proxy config)
This commit is contained in:
committed by
John Spray
parent
38f184bc91
commit
1f73dfb842
@@ -292,7 +292,7 @@ pub struct NodeInfo {
|
||||
impl NodeInfo {
|
||||
pub async fn connect(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
timeout: Duration,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
@@ -330,20 +330,20 @@ pub(crate) trait Api {
|
||||
/// We still have to mock the scram to avoid leaking information that user doesn't exist.
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
|
||||
|
||||
/// Wake up the compute node and return the corresponding connection info.
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
|
||||
}
|
||||
@@ -363,7 +363,7 @@ pub enum ConsoleBackend {
|
||||
impl Api for ConsoleBackend {
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
|
||||
use ConsoleBackend::*;
|
||||
@@ -378,7 +378,7 @@ impl Api for ConsoleBackend {
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
|
||||
use ConsoleBackend::*;
|
||||
@@ -393,7 +393,7 @@ impl Api for ConsoleBackend {
|
||||
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
|
||||
use ConsoleBackend::*;
|
||||
|
||||
@@ -158,7 +158,7 @@ impl super::Api for Api {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
_ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
Ok(CachedRoleSecret::new_uncached(
|
||||
@@ -168,7 +168,7 @@ impl super::Api for Api {
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
_ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
Ok((
|
||||
@@ -182,7 +182,7 @@ impl super::Api for Api {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
_ctx: &RequestMonitoring,
|
||||
_user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, WakeComputeError> {
|
||||
self.do_wake_compute().map_ok(Cached::new_uncached).await
|
||||
|
||||
@@ -57,7 +57,7 @@ impl Api {
|
||||
|
||||
async fn do_get_auth_info(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<AuthInfo, GetAuthInfoError> {
|
||||
if !self
|
||||
@@ -69,7 +69,7 @@ impl Api {
|
||||
info!("endpoint is not valid, skipping the request");
|
||||
return Ok(AuthInfo::default());
|
||||
}
|
||||
let request_id = ctx.session_id.to_string();
|
||||
let request_id = ctx.session_id().to_string();
|
||||
let application_name = ctx.console_application_name();
|
||||
async {
|
||||
let request = self
|
||||
@@ -77,7 +77,7 @@ impl Api {
|
||||
.get("proxy_get_role_secret")
|
||||
.header("X-Request-ID", &request_id)
|
||||
.header("Authorization", format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", ctx.session_id)])
|
||||
.query(&[("session_id", ctx.session_id())])
|
||||
.query(&[
|
||||
("application_name", application_name.as_str()),
|
||||
("project", user_info.endpoint.as_str()),
|
||||
@@ -87,7 +87,7 @@ impl Api {
|
||||
|
||||
info!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
@@ -130,10 +130,10 @@ impl Api {
|
||||
|
||||
async fn do_wake_compute(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<NodeInfo, WakeComputeError> {
|
||||
let request_id = ctx.session_id.to_string();
|
||||
let request_id = ctx.session_id().to_string();
|
||||
let application_name = ctx.console_application_name();
|
||||
async {
|
||||
let mut request_builder = self
|
||||
@@ -141,7 +141,7 @@ impl Api {
|
||||
.get("proxy_wake_compute")
|
||||
.header("X-Request-ID", &request_id)
|
||||
.header("Authorization", format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", ctx.session_id)])
|
||||
.query(&[("session_id", ctx.session_id())])
|
||||
.query(&[
|
||||
("application_name", application_name.as_str()),
|
||||
("project", user_info.endpoint.as_str()),
|
||||
@@ -156,7 +156,7 @@ impl Api {
|
||||
|
||||
info!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
@@ -192,7 +192,7 @@ impl super::Api for Api {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
let normalized_ep = &user_info.endpoint.normalize();
|
||||
@@ -226,7 +226,7 @@ impl super::Api for Api {
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
let normalized_ep = &user_info.endpoint.normalize();
|
||||
@@ -268,7 +268,7 @@ impl super::Api for Api {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, WakeComputeError> {
|
||||
let key = user_info.endpoint_cache_key();
|
||||
|
||||
Reference in New Issue
Block a user