chore(proxy): discard request context span during passthrough (#9882)

## Problem

The RequestContext::span shouldn't live for the entire postgres
connection, only the handshake.

## Summary of changes

* Slight refactor to the RequestContext to discard the span upon
handshake completion.
* Make sure the temporary future for the handshake is dropped (not bound
to a variable)
* Runs our nightly fmt script
This commit is contained in:
Conrad Ludgate
2024-11-25 21:32:53 +00:00
committed by GitHub
parent a74ab9338d
commit 96a1b71c84
6 changed files with 59 additions and 55 deletions

View File

@@ -1,7 +1,8 @@
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use dashmap::DashMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use pq_proto::CancelKeyData;
use thiserror::Error;
use tokio::net::TcpStream;
@@ -17,9 +18,6 @@ use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
};
use std::net::IpAddr;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use futures::TryFutureExt;
use futures::{FutureExt, TryFutureExt};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, Instrument};
@@ -88,40 +88,37 @@ pub async fn task_main(
crate::metrics::Protocol::Tcp,
&config.region,
);
let span = ctx.span();
let startup = Box::pin(
handle_client(
config,
backend,
&ctx,
cancellation_handler,
socket,
conn_gauge,
)
.instrument(span.clone()),
);
let res = startup.await;
let res = handle_client(
config,
backend,
&ctx,
cancellation_handler,
socket,
conn_gauge,
)
.instrument(ctx.span())
.boxed()
.await;
match res {
Err(e) => {
// todo: log and push to ctx the error kind
ctx.set_error_kind(e.get_error_kind());
error!(parent: &span, "per-client task finished with an error: {e:#}");
error!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
}
Ok(None) => {
ctx.set_success();
}
Ok(Some(p)) => {
ctx.set_success();
ctx.log_connect();
match p.proxy_pass().instrument(span.clone()).await {
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
}
Err(ErrorSource::Compute(e)) => {
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
}
}
}
@@ -219,6 +216,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
client: stream,
aux: node.aux.clone(),
compute: node,
session_id: ctx.session_id(),
_req: request_gauge,
_conn: conn_gauge,
_cancel: session,

View File

@@ -272,11 +272,14 @@ impl RequestContext {
this.success = true;
}
pub fn log_connect(&self) {
self.0
.try_lock()
.expect("should not deadlock")
.log_connect();
pub fn log_connect(self) -> DisconnectLogger {
let mut this = self.0.into_inner();
this.log_connect();
// close current span.
this.span = Span::none();
DisconnectLogger(this)
}
pub(crate) fn protocol(&self) -> Protocol {
@@ -434,8 +437,14 @@ impl Drop for RequestContextInner {
fn drop(&mut self) {
if self.sender.is_some() {
self.log_connect();
} else {
self.log_disconnect();
}
}
}
pub struct DisconnectLogger(RequestContextInner);
impl Drop for DisconnectLogger {
fn drop(&mut self) {
self.0.log_disconnect();
}
}

View File

@@ -10,7 +10,7 @@ pub(crate) mod wake_compute;
use std::sync::Arc;
pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource};
use futures::TryFutureExt;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pq_proto::{BeMessage as Be, StartupMessageParams};
@@ -123,42 +123,39 @@ pub async fn task_main(
crate::metrics::Protocol::Tcp,
&config.region,
);
let span = ctx.span();
let startup = Box::pin(
handle_client(
config,
auth_backend,
&ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter2,
conn_gauge,
)
.instrument(span.clone()),
);
let res = startup.await;
let res = handle_client(
config,
auth_backend,
&ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
endpoint_rate_limiter2,
conn_gauge,
)
.instrument(ctx.span())
.boxed()
.await;
match res {
Err(e) => {
// todo: log and push to ctx the error kind
ctx.set_error_kind(e.get_error_kind());
warn!(parent: &span, "per-client task finished with an error: {e:#}");
warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
}
Ok(None) => {
ctx.set_success();
}
Ok(Some(p)) => {
ctx.set_success();
ctx.log_connect();
match p.proxy_pass().instrument(span.clone()).await {
let _disconnect = ctx.log_connect();
match p.proxy_pass().await {
Ok(()) => {}
Err(ErrorSource::Client(e)) => {
warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
}
Err(ErrorSource::Compute(e)) => {
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
}
}
}
@@ -352,6 +349,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
client: stream,
aux: node.aux.clone(),
compute: node,
session_id: ctx.session_id(),
_req: request_gauge,
_conn: conn_gauge,
_cancel: session,

View File

@@ -59,6 +59,7 @@ pub(crate) struct ProxyPassthrough<P, S> {
pub(crate) client: Stream<S>,
pub(crate) compute: PostgresConnection,
pub(crate) aux: MetricsAuxInfo,
pub(crate) session_id: uuid::Uuid,
pub(crate) _req: NumConnectionRequestsGuard<'static>,
pub(crate) _conn: NumClientConnectionsGuard<'static>,
@@ -69,7 +70,7 @@ impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
tracing::warn!(?err, "could not cancel the query in the database");
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
res
}

View File

@@ -1,6 +1,6 @@
use core::net::IpAddr;
use std::sync::Arc;
use core::net::IpAddr;
use pq_proto::CancelKeyData;
use redis::AsyncCommands;
use tokio::sync::Mutex;