From 0ad08eb138f07f42554767de3fbe184f5ff3e005 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 15 Mar 2023 11:44:55 +0400 Subject: [PATCH] Forward framed read buf contents to compute before proxy pass. Otherwise they get lost. Normally buffer is empty before proxy pass, but this is not the case with pipeline mode of out npm driver; fixes connection hangup introduced by b80fe41af3e for it. --- libs/pq_proto/src/framed.rs | 6 +++--- proxy/src/proxy.rs | 19 +++++++++++++------ proxy/src/stream.rs | 5 +++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/libs/pq_proto/src/framed.rs b/libs/pq_proto/src/framed.rs index 972730cbab..3cdca45009 100644 --- a/libs/pq_proto/src/framed.rs +++ b/libs/pq_proto/src/framed.rs @@ -63,9 +63,9 @@ impl Framed { &self.stream } - /// Extract the underlying stream. - pub fn into_inner(self) -> S { - self.stream + /// Deconstruct into the underlying stream and read buffer. + pub fn into_inner(self) -> (S, BytesMut) { + (self.stream, self.read_buf) } /// Return new Framed with stream type transformed by async f, for TLS diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index abeff6a33b..f024b1c1fc 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -16,7 +16,7 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; @@ -209,9 +209,10 @@ async fn handshake( if let Some(tls) = tls.take() { // Upgrade raw stream into a secure TLS-backed stream. // NOTE: We've consumed `tls`; this fact will be used later. - stream = PqStream::new( - stream.into_inner().upgrade(tls.to_server_config()).await?, - ); + // Client shouldn't send any data before TLS handshake, + // so ignore pqstream read buffer. + let (raw, _) = stream.into_inner(); + stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?); } } _ => bail!(ERR_PROTO_VIOLATION), @@ -443,11 +444,17 @@ impl Client<'_, S> { value: mut node_info, } = auth_result; - let node = connect_to_compute(&mut node_info, params, &extra, &creds) + let mut node = connect_to_compute(&mut node_info, params, &extra, &creds) .or_else(|e| stream.throw_error(e)) .await?; prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?; - proxy_pass(stream.into_inner(), node.stream, &node_info.aux).await + // Before proxy passing, forward to compute whatever data is left in the + // PqStream input buffer. Normally there is none, but our serverless npm + // driver in pipeline mode sends startup, password and first query + // immediately after opening the connection. + let (stream, read_buf) = stream.into_inner(); + node.stream.write_all(&read_buf).await?; + proxy_pass(stream, node.stream, &node_info.aux).await } } diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index 9dfc435e39..7cb292ed58 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -1,5 +1,6 @@ use crate::error::UserFacingError; use anyhow::bail; +use bytes::BytesMut; use pin_project_lite::pin_project; use pq_proto::framed::{ConnectionError, Framed}; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError}; @@ -27,8 +28,8 @@ impl PqStream { } } - /// Extract the underlying stream. - pub fn into_inner(self) -> S { + /// Extract the underlying stream and read buffer. + pub fn into_inner(self) -> (S, BytesMut) { self.framed.into_inner() }