mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
proxy: dont return connection pending (#5107)
## Problem We were returning Pending when a connection had a notice/notification (introduced recently in #5020). When returning pending, the runtime assumes you will call `cx.waker().wake()` in order to continue processing. We weren't doing that, so the connection task would get stuck ## Summary of changes Don't return pending. Loop instead
This commit is contained in:
@@ -408,9 +408,9 @@ async fn connect_to_compute_once(
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session);
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
let span = info_span!(parent: None, "connection", %conn_info, %conn_id);
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
span.in_scope(|| {
|
||||
info!(%session, "new connection");
|
||||
info!(%conn_info, %session, "new connection");
|
||||
});
|
||||
|
||||
tokio::spawn(
|
||||
@@ -420,26 +420,28 @@ async fn connect_to_compute_once(
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
Poll::Pending
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
Poll::Pending
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
Poll::Pending
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
Poll::Ready(())
|
||||
}
|
||||
None => Poll::Ready(()),
|
||||
}
|
||||
})
|
||||
.instrument(span)
|
||||
|
||||
Reference in New Issue
Block a user