From 43f9a16e4608789a2c190aa5c32303a0d0182f30 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 24 May 2024 17:56:12 +0100 Subject: [PATCH] proxy: fix websocket buffering (#7878) ## Problem Seems the websocket buffering was broken for large query responses only ## Summary of changes Move buffering until after the underlying stream is ready. Tested locally confirms this fixes the bug. Also fixes the pg-sni-router missing metrics bug --- proxy/src/bin/pg_sni_router.rs | 3 +++ proxy/src/serverless/websocket.rs | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index fb16b76567..e1674049a6 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -9,6 +9,7 @@ use futures::future::Either; use itertools::Itertools; use proxy::config::TlsServerEndPoint; use proxy::context::RequestMonitoring; +use proxy::metrics::{Metrics, ThreadPoolMetrics}; use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled}; use rustls::pki_types::PrivateKeyDer; use tokio::net::TcpListener; @@ -65,6 +66,8 @@ async fn main() -> anyhow::Result<()> { let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + Metrics::install(Arc::new(ThreadPoolMetrics::new(0))); + let args = cli().get_matches(); let destination: String = args.get_one::("dest").unwrap().parse()?; diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 61d6d60dbe..7d3153a3c1 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -51,9 +51,10 @@ impl AsyncWrite for WebSocketRw { ) -> Poll> { let this = self.project(); let mut stream = this.stream; - this.send.put(buf); ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?; + + this.send.put(buf); match stream.as_mut().start_send(Frame::binary(this.send.split())) { Ok(()) => Poll::Ready(Ok(buf.len())), Err(e) => Poll::Ready(Err(io_error(e))),