proxy: pool connection logs (#5020)

## Problem

Errors and notices that happen during a pooled connection lifecycle have
no session identifiers

## Summary of changes

Using a watch channel, we set the session ID whenever it changes. This
way we can see the status of a connection for that session

Also, adding a connection id to be able to search the entire connection
lifecycle
This commit is contained in:
Conrad Ludgate
2023-08-18 11:44:08 +01:00
committed by GitHub
parent 67af24191e
commit ec10838aa4
4 changed files with 95 additions and 34 deletions

View File

@@ -62,7 +62,6 @@ pub(super) async fn authenticate(
}
};
info!("compute node's state has likely changed; requesting a wake-up");
let mut num_retries = 0;
let mut node = loop {
let wake_res = api.wake_compute(extra, creds).await;

View File

@@ -1,16 +1,21 @@
use anyhow::Context;
use async_trait::async_trait;
use dashmap::DashMap;
use futures::future::poll_fn;
use parking_lot::RwLock;
use pbkdf2::{
password_hash::{PasswordHashString, PasswordHasher, PasswordVerifier, SaltString},
Params, Pbkdf2,
};
use pq_proto::StartupMessageParams;
use std::fmt;
use std::sync::atomic::{self, AtomicUsize};
use std::{collections::HashMap, sync::Arc};
use std::{
fmt,
task::{ready, Poll},
};
use tokio::time;
use tokio_postgres::AsyncMessage;
use crate::{auth, console};
use crate::{compute, config};
@@ -19,8 +24,8 @@ use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::ConnectMechanism;
use tracing::error;
use tracing::info;
use tracing::{error, warn};
use tracing::{info, info_span, Instrument};
pub const APP_NAME: &str = "sql_over_http";
const MAX_CONNS_PER_ENDPOINT: usize = 20;
@@ -48,7 +53,7 @@ impl fmt::Display for ConnInfo {
}
struct ConnPoolEntry {
conn: tokio_postgres::Client,
conn: Client,
_last_access: std::time::Instant,
}
@@ -110,8 +115,9 @@ impl GlobalConnPool {
&self,
conn_info: &ConnInfo,
force_new: bool,
) -> anyhow::Result<tokio_postgres::Client> {
let mut client: Option<tokio_postgres::Client> = None;
session_id: uuid::Uuid,
) -> anyhow::Result<Client> {
let mut client: Option<Client> = None;
let mut hash_valid = false;
if !force_new {
@@ -153,16 +159,17 @@ impl GlobalConnPool {
// ok return cached connection if found and establish a new one otherwise
let new_client = if let Some(client) = client {
if client.is_closed() {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
connect_to_compute(self.proxy_config, conn_info).await
connect_to_compute(self.proxy_config, conn_info, session_id).await
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
return Ok(client);
}
} else {
info!("pool: opening a new connection '{conn_info}'");
connect_to_compute(self.proxy_config, conn_info).await
connect_to_compute(self.proxy_config, conn_info, session_id).await
};
match &new_client {
@@ -201,11 +208,7 @@ impl GlobalConnPool {
new_client
}
pub async fn put(
&self,
conn_info: &ConnInfo,
client: tokio_postgres::Client,
) -> anyhow::Result<()> {
pub async fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> {
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
// return connection to the pool
@@ -282,11 +285,12 @@ impl GlobalConnPool {
struct TokioMechanism<'a> {
conn_info: &'a ConnInfo,
session_id: uuid::Uuid,
}
#[async_trait]
impl ConnectMechanism for TokioMechanism<'_> {
type Connection = tokio_postgres::Client;
type Connection = Client;
type ConnectError = tokio_postgres::Error;
type Error = anyhow::Error;
@@ -295,7 +299,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError> {
connect_to_compute_once(node_info, self.conn_info, timeout).await
connect_to_compute_once(node_info, self.conn_info, timeout, self.session_id).await
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
@@ -308,7 +312,8 @@ impl ConnectMechanism for TokioMechanism<'_> {
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
) -> anyhow::Result<tokio_postgres::Client> {
session_id: uuid::Uuid,
) -> anyhow::Result<Client> {
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
@@ -339,17 +344,27 @@ async fn connect_to_compute(
.await?
.context("missing cache entry from wake_compute")?;
crate::proxy::connect_to_compute(&TokioMechanism { conn_info }, node_info, &extra, &creds).await
crate::proxy::connect_to_compute(
&TokioMechanism {
conn_info,
session_id,
},
node_info,
&extra,
&creds,
)
.await
}
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
conn_info: &ConnInfo,
timeout: time::Duration,
) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
mut session: uuid::Uuid,
) -> Result<Client, tokio_postgres::Error> {
let mut config = (*node_info.config).clone();
let (client, connection) = config
let (client, mut connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
@@ -358,11 +373,53 @@ async fn connect_to_compute_once(
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection error: {}", e);
}
let (tx, mut rx) = tokio::sync::watch::channel(session);
let conn_id = uuid::Uuid::new_v4();
let span = info_span!(parent: None, "connection", %conn_info, %conn_id);
span.in_scope(|| {
info!(%session, "new connection");
});
Ok(client)
tokio::spawn(
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
Poll::Pending
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
Poll::Pending
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
Poll::Pending
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
Poll::Ready(())
}
None => Poll::Ready(()),
}
})
.instrument(span)
);
Ok(Client {
inner: client,
session: tx,
})
}
pub struct Client {
pub inner: tokio_postgres::Client,
session: tokio::sync::watch::Sender<uuid::Uuid>,
}

View File

@@ -16,6 +16,7 @@ use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::Row;
use tracing::Instrument;
use url::Url;
use super::conn_pool::ConnInfo;
@@ -181,6 +182,7 @@ pub async fn handle(
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<(Value, HashMap<HeaderName, HeaderValue>)> {
//
// Determine the destination and connection params
@@ -230,18 +232,18 @@ pub async fn handle(
let body = hyper::body::to_bytes(request.into_body()).await?;
let payload: Payload = serde_json::from_slice(&body)?;
let mut client = conn_pool.get(&conn_info, !allow_pool).await?;
let mut client = conn_pool.get(&conn_info, !allow_pool, session_id).await?;
//
// Now execute the query and return the result
//
let result = match payload {
Payload::Single(query) => query_to_json(&client, query, raw_output, array_mode)
Payload::Single(query) => query_to_json(&client.inner, query, raw_output, array_mode)
.await
.map(|x| (x, HashMap::default())),
Payload::Batch(batch_query) => {
let mut results = Vec::new();
let mut builder = client.build_transaction();
let mut builder = client.inner.build_transaction();
if let Some(isolation_level) = txn_isolation_level {
builder = builder.isolation_level(isolation_level);
}
@@ -285,9 +287,12 @@ pub async fn handle(
if allow_pool {
// return connection to the pool
tokio::task::spawn(async move {
let _ = conn_pool.put(&conn_info, client).await;
});
tokio::task::spawn(
async move {
let _ = conn_pool.put(&conn_info, client).await;
}
.in_current_span(),
);
}
result

View File

@@ -203,7 +203,7 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = sql_over_http::handle(request, sni_hostname, conn_pool)
let result = sql_over_http::handle(request, sni_hostname, conn_pool, session_id)
.instrument(info_span!("sql-over-http"))
.await;
let status_code = match result {
@@ -307,7 +307,7 @@ pub async fn task_main(
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")
session = %session_id
))
.await
}