From faf070f288cf3fa52aefd7381f82d98280eba4d5 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 25 Aug 2023 13:08:45 +0100 Subject: [PATCH] proxy: dont return connection pending (#5107) ## Problem We were returning Pending when a connection had a notice/notification (introduced recently in #5020). When returning pending, the runtime assumes you will call `cx.waker().wake()` in order to continue processing. We weren't doing that, so the connection task would get stuck ## Summary of changes Don't return pending. Loop instead --- proxy/src/http/conn_pool.rs | 42 +++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index c02ec61945..e771e5d7ed 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -408,9 +408,9 @@ async fn connect_to_compute_once( let (tx, mut rx) = tokio::sync::watch::channel(session); let conn_id = uuid::Uuid::new_v4(); - let span = info_span!(parent: None, "connection", %conn_info, %conn_id); + let span = info_span!(parent: None, "connection", %conn_id); span.in_scope(|| { - info!(%session, "new connection"); + info!(%conn_info, %session, "new connection"); }); tokio::spawn( @@ -420,26 +420,28 @@ async fn connect_to_compute_once( info!(%session, "changed session"); } - let message = ready!(connection.poll_message(cx)); + loop { + let message = ready!(connection.poll_message(cx)); - match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session, "notice: {}", notice); - Poll::Pending + match message { + Some(Ok(AsyncMessage::Notice(notice))) => { + info!(%session, "notice: {}", notice); + } + Some(Ok(AsyncMessage::Notification(notif))) => { + warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); + } + Some(Ok(_)) => { + warn!(%session, "unknown message"); + } + Some(Err(e)) => { + error!(%session, "connection error: {}", e); + return Poll::Ready(()) + } + None => { + info!("connection closed"); + return Poll::Ready(()) + } } - Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); - Poll::Pending - } - Some(Ok(_)) => { - warn!(%session, "unknown message"); - Poll::Pending - } - Some(Err(e)) => { - error!(%session, "connection error: {}", e); - Poll::Ready(()) - } - None => Poll::Ready(()), } }) .instrument(span)