proxy: add more http logging (#6726)

## Problem

hard to see where time is taken during HTTP flow.

## Summary of changes

add a lot more for query state. add a conn_id field to the sql-over-http
span
This commit is contained in:
Conrad Ludgate
2024-02-12 15:03:45 +00:00
committed by GitHub
parent 242dd8398c
commit 789a71c4ee
4 changed files with 41 additions and 27 deletions

View File

@@ -200,8 +200,9 @@ impl LatencyTimer {
pub fn success(&mut self) {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
if let Some(start) = self.start.take() {
self.accumulated += start.elapsed();
}
// success
self.outcome = "success";

View File

@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use tracing::info;
use tracing::{field::display, info};
use crate::{
auth::{backend::ComputeCredentialKeys, check_peer_addr_is_in_list, AuthError},
@@ -15,7 +15,7 @@ use crate::{
proxy::connect_compute::ConnectMechanism,
};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
pub struct PoolingBackend {
pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
@@ -81,8 +81,8 @@ impl PoolingBackend {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
ctx.set_application(Some(APP_NAME));
tracing::Span::current().record("conn_id", display(conn_id));
info!("pool: opening a new connection '{conn_info}'");
let backend = self
.config
.auth_backend

View File

@@ -4,7 +4,6 @@ use metrics::IntCounterPairGuard;
use parking_lot::RwLock;
use rand::Rng;
use smallvec::SmallVec;
use smol_str::SmolStr;
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
use std::{
fmt,
@@ -31,8 +30,6 @@ use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
pub const APP_NAME: SmolStr = SmolStr::new_inline("/sql_over_http");
#[derive(Debug, Clone)]
pub struct ConnInfo {
pub user_info: ComputeUserInfo,
@@ -379,12 +376,13 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(ctx.session_id)?;
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
info!("pool: reusing connection '{conn_info}'");
client.session.send(ctx.session_id)?;
ctx.latency_timer.pool_hit();
ctx.latency_timer.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
@@ -577,7 +575,6 @@ pub struct Client<C: ClientInnerExt> {
}
pub struct Discard<'a, C: ClientInnerExt> {
conn_id: uuid::Uuid,
conn_info: &'a ConnInfo,
pool: &'a mut Weak<RwLock<EndpointConnPool<C>>>,
}
@@ -603,14 +600,7 @@ impl<C: ClientInnerExt> Client<C> {
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(
&mut inner.inner,
Discard {
pool,
conn_info,
conn_id: inner.conn_id,
},
)
(&mut inner.inner, Discard { pool, conn_info })
}
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
@@ -625,13 +615,13 @@ impl<C: ClientInnerExt> Discard<'_, C> {
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is not idle")
info!("pool: throwing away connection '{conn_info}' because connection is not idle")
}
}
pub fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
}
}
}

View File

@@ -36,6 +36,7 @@ use crate::error::ReportableError;
use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::DbName;
use crate::RoleName;
@@ -305,7 +306,14 @@ pub async fn handle(
Ok(response)
}
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
#[instrument(
name = "sql-over-http",
skip_all,
fields(
pid = tracing::field::Empty,
conn_id = tracing::field::Empty
)
)]
async fn handle_inner(
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
@@ -359,12 +367,10 @@ async fn handle_inner(
let txn_read_only = headers.get(&TXN_READ_ONLY) == Some(&HEADER_VALUE_TRUE);
let txn_deferrable = headers.get(&TXN_DEFERRABLE) == Some(&HEADER_VALUE_TRUE);
let paused = ctx.latency_timer.pause();
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => MAX_REQUEST_SIZE + 1,
};
drop(paused);
info!(request_content_length, "request size in bytes");
HTTP_CONTENT_LENGTH.observe(request_content_length as f64);
@@ -380,15 +386,20 @@ async fn handle_inner(
let body = hyper::body::to_bytes(request.into_body())
.await
.map_err(anyhow::Error::from)?;
info!(length = body.len(), "request payload read");
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
};
let authenticate_and_connect = async {
let keys = backend.authenticate(ctx, &conn_info).await?;
backend
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
Ok::<_, HttpConnError>(client)
};
// Run both operations in parallel
@@ -420,6 +431,7 @@ async fn handle_inner(
results
}
Payload::Batch(statements) => {
info!("starting transaction");
let (inner, mut discard) = client.inner();
let mut builder = inner.build_transaction();
if let Some(isolation_level) = txn_isolation_level {
@@ -449,6 +461,7 @@ async fn handle_inner(
.await
{
Ok(results) => {
info!("commit");
let status = transaction.commit().await.map_err(|e| {
// if we cannot commit - for now don't return connection to pool
// TODO: get a query status from the error
@@ -459,6 +472,7 @@ async fn handle_inner(
results
}
Err(err) => {
info!("rollback");
let status = transaction.rollback().await.map_err(|e| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
@@ -533,8 +547,10 @@ async fn query_to_json<T: GenericClient>(
raw_output: bool,
default_array_mode: bool,
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
info!("executing query");
let query_params = data.params;
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
info!("finished executing query");
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
@@ -569,6 +585,13 @@ async fn query_to_json<T: GenericClient>(
}
.and_then(|s| s.parse::<i64>().ok());
info!(
rows = rows.len(),
?ready,
command_tag,
"finished reading rows"
);
let mut fields = vec![];
let mut columns = vec![];