mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
[proxy] Implement compute node info cache (#3331)
This patch adds a timed LRU cache implementation and a compute node info cache on top of that. Cache entries might expire on their own (default ttl=5mins) or become invalid due to real-world events, e.g. compute node scale-to-zero event, so we add a connection retry loop with a wake-up call. Solved problems: - [x] Find a decent LRU implementation. - [x] Implement timed LRU on top of that. - [x] Cache results of `proxy_wake_compute` API call. - [x] Don't invalidate newer cache entries for the same key. - [x] Add cmdline configuration knobs (requires some refactoring). - [x] Add failed connection estab metric. - [x] Refactor auth backends to make things simpler (retries, cache placement, etc). - [x] Address review comments (add code comments + cleanup). - [x] Retry `/proxy_wake_compute` if we couldn't connect to a compute (e.g. stalled cache entry). - [x] Add high-level description for `TimedLru`. TODOs (will be addressed later): - [ ] Add cache metrics (hit, spurious hit, miss). - [ ] Synchronize http requests across concurrent per-client tasks (https://github.com/neondatabase/neon/pull/3331#issuecomment-1399216069). - [ ] Cache results of `proxy_get_role_secret` API call.
This commit is contained in:
@@ -2,9 +2,12 @@
|
||||
mod tests;
|
||||
|
||||
use crate::{
|
||||
auth,
|
||||
auth::{self, backend::AuthSuccess},
|
||||
cancellation::{self, CancelMap},
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, messages::MetricsAuxInfo},
|
||||
error::io_error,
|
||||
stream::{MeasuredStream, PqStream, Stream},
|
||||
};
|
||||
use anyhow::{bail, Context};
|
||||
@@ -14,7 +17,10 @@ use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
/// Number of times we should retry the `/proxy_wake_compute` http request.
|
||||
const NUM_RETRIES_WAKE_COMPUTE: usize = 1;
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
@@ -35,6 +41,15 @@ static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_total",
|
||||
"Number of connection failures (per kind).",
|
||||
&["kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
@@ -82,11 +97,12 @@ pub async fn task_main(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(tech debt): unite this with its twin below.
|
||||
pub async fn handle_ws_client(
|
||||
config: &ProxyConfig,
|
||||
config: &'static ProxyConfig,
|
||||
cancel_map: &CancelMap,
|
||||
session_id: uuid::Uuid,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
hostname: Option<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
@@ -99,7 +115,7 @@ pub async fn handle_ws_client(
|
||||
let hostname = hostname.as_deref();
|
||||
|
||||
// TLS is None here, because the connection is already encrypted.
|
||||
let do_handshake = handshake(stream, None, cancel_map).instrument(info_span!("handshake"));
|
||||
let do_handshake = handshake(stream, None, cancel_map);
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
@@ -124,10 +140,10 @@ pub async fn handle_ws_client(
|
||||
}
|
||||
|
||||
async fn handle_client(
|
||||
config: &ProxyConfig,
|
||||
config: &'static ProxyConfig,
|
||||
cancel_map: &CancelMap,
|
||||
session_id: uuid::Uuid,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
) -> anyhow::Result<()> {
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
|
||||
@@ -136,7 +152,7 @@ async fn handle_client(
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
|
||||
let do_handshake = handshake(stream, tls, cancel_map);
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
Some(x) => x,
|
||||
None => return Ok(()), // it's a cancellation request
|
||||
@@ -165,6 +181,7 @@ async fn handle_client(
|
||||
/// For better testing experience, `stream` can be any object satisfying the traits.
|
||||
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
|
||||
/// we also take an extra care of propagating only the select handshake errors to client.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream: S,
|
||||
mut tls: Option<&TlsConfig>,
|
||||
@@ -226,6 +243,133 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node once.
|
||||
#[tracing::instrument(name = "connect_once", skip_all)]
|
||||
async fn connect_to_compute_once(
|
||||
node_info: &console::CachedNodeInfo,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
// If we couldn't connect, a cached connection info might be to blame
|
||||
// (e.g. the compute node's address might've changed at the wrong time).
|
||||
// Invalidate the cache entry (if any) to prevent subsequent errors.
|
||||
let invalidate_cache = |_: &compute::ConnectionError| {
|
||||
let is_cached = node_info.cached();
|
||||
if is_cached {
|
||||
warn!("invalidating stalled compute node info cache entry");
|
||||
node_info.invalidate();
|
||||
}
|
||||
|
||||
let label = match is_cached {
|
||||
true => "compute_cached",
|
||||
false => "compute_uncached",
|
||||
};
|
||||
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
|
||||
};
|
||||
|
||||
node_info
|
||||
.config
|
||||
.connect()
|
||||
.inspect_err(invalidate_cache)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn connect_to_compute(
|
||||
node_info: &mut console::CachedNodeInfo,
|
||||
params: &StartupMessageParams,
|
||||
extra: &console::ConsoleReqExtra<'_>,
|
||||
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
|
||||
loop {
|
||||
// Apply startup params to the (possibly, cached) compute node info.
|
||||
node_info.config.set_startup_params(params);
|
||||
match connect_to_compute_once(node_info).await {
|
||||
Err(e) if num_retries > 0 => {
|
||||
info!("compute node's state has changed; requesting a wake-up");
|
||||
match creds.wake_compute(extra).map_err(io_error).await? {
|
||||
// Update `node_info` and try one more time.
|
||||
Some(mut new) => {
|
||||
new.config.reuse_password(&node_info.config);
|
||||
*node_info = new;
|
||||
}
|
||||
// Link auth doesn't work that way, so we just exit.
|
||||
None => return Err(e),
|
||||
}
|
||||
}
|
||||
other => return other,
|
||||
}
|
||||
|
||||
num_retries -= 1;
|
||||
info!("retrying after wake-up ({num_retries} attempts left)");
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn prepare_client_connection(
|
||||
node: &compute::PostgresConnection,
|
||||
reported_auth_ok: bool,
|
||||
session: cancellation::Session<'_>,
|
||||
stream: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Register compute's query cancellation token and produce a new, unique one.
|
||||
// The new token (cancel_key_data) will be sent to the client.
|
||||
let cancel_key_data = session.enable_query_cancellation(node.cancel_closure.clone());
|
||||
|
||||
// Report authentication success if we haven't done this already.
|
||||
// Note that we do this only (for the most part) after we've connected
|
||||
// to a compute (see above) which performs its own authentication.
|
||||
if !reported_auth_ok {
|
||||
stream.write_message_noflush(&Be::AuthenticationOk)?;
|
||||
}
|
||||
|
||||
// Forward all postgres connection params to the client.
|
||||
// Right now the implementation is very hacky and inefficent (ideally,
|
||||
// we don't need an intermediate hashmap), but at least it should be correct.
|
||||
for (name, value) in &node.params {
|
||||
// TODO: Theoretically, this could result in a big pile of params...
|
||||
stream.write_message_noflush(&Be::ParameterStatus {
|
||||
name: name.as_bytes(),
|
||||
value: value.as_bytes(),
|
||||
})?;
|
||||
}
|
||||
|
||||
stream
|
||||
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
|
||||
.write_message(&Be::ReadyForQuery)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Forward bytes in both directions (client <-> compute).
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn proxy_pass(
|
||||
client: impl AsyncRead + AsyncWrite + Unpin,
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
aux: &MetricsAuxInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(client, |cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
});
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("rx"));
|
||||
let mut compute = MeasuredStream::new(compute, |cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
m_recv.inc_by(cnt as u64);
|
||||
});
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let _ = tokio::io::copy_bidirectional(&mut client, &mut compute).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Thin connection context.
|
||||
struct Client<'a, S> {
|
||||
/// The underlying libpq protocol stream.
|
||||
@@ -255,17 +399,17 @@ impl<'a, S> Client<'a, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
/// Let the client authenticate and connect to the designated compute node.
|
||||
async fn connect_to_db(self, session: cancellation::Session<'_>) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
creds,
|
||||
mut creds,
|
||||
params,
|
||||
session_id,
|
||||
} = self;
|
||||
|
||||
let extra = auth::ConsoleReqExtra {
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id, // aka this connection's id
|
||||
application_name: params.get("application_name"),
|
||||
};
|
||||
@@ -278,54 +422,16 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
|
||||
.instrument(info_span!("auth"))
|
||||
.await?;
|
||||
|
||||
let node = auth_result.value;
|
||||
let (db, cancel_closure) = node
|
||||
.config
|
||||
.connect(params)
|
||||
let AuthSuccess {
|
||||
reported_auth_ok,
|
||||
value: mut node_info,
|
||||
} = auth_result;
|
||||
|
||||
let node = connect_to_compute(&mut node_info, params, &extra, &creds)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let cancel_key_data = session.enable_query_cancellation(cancel_closure);
|
||||
|
||||
// Report authentication success if we haven't done this already.
|
||||
// Note that we do this only (for the most part) after we've connected
|
||||
// to a compute (see above) which performs its own authentication.
|
||||
if !auth_result.reported_auth_ok {
|
||||
stream.write_message_noflush(&Be::AuthenticationOk)?;
|
||||
}
|
||||
|
||||
// Forward all postgres connection params to the client.
|
||||
// Right now the implementation is very hacky and inefficent (ideally,
|
||||
// we don't need an intermediate hashmap), but at least it should be correct.
|
||||
for (name, value) in &db.params {
|
||||
// TODO: Theoretically, this could result in a big pile of params...
|
||||
stream.write_message_noflush(&Be::ParameterStatus {
|
||||
name: name.as_bytes(),
|
||||
value: value.as_bytes(),
|
||||
})?;
|
||||
}
|
||||
|
||||
stream
|
||||
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
|
||||
.write_message(&Be::ReadyForQuery)
|
||||
.await?;
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(stream.into_inner(), |cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
});
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&node.aux.traffic_labels("rx"));
|
||||
let mut db = MeasuredStream::new(db.stream, |cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
m_recv.inc_by(cnt as u64);
|
||||
});
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
|
||||
|
||||
Ok(())
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
proxy_pass(stream.into_inner(), node.stream, &node_info.aux).await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user