From a7ab53c80cea7ab7922ebd2a5a4d71963ffcb3ba Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 15 Mar 2023 11:44:55 +0400 Subject: [PATCH] Forward framed read buf contents to compute before proxy pass. Otherwise they get lost. Normally buffer is empty before proxy pass, but this is not the case with pipeline mode of out npm driver; fixes connection hangup introduced by b80fe41af3e for it. fixes https://github.com/neondatabase/neon/issues/3822 --- libs/pq_proto/src/framed.rs | 6 +++--- proxy/src/proxy.rs | 27 +++++++++++++++++++++------ proxy/src/stream.rs | 5 +++-- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/libs/pq_proto/src/framed.rs b/libs/pq_proto/src/framed.rs index 972730cbab..3cdca45009 100644 --- a/libs/pq_proto/src/framed.rs +++ b/libs/pq_proto/src/framed.rs @@ -63,9 +63,9 @@ impl Framed { &self.stream } - /// Extract the underlying stream. - pub fn into_inner(self) -> S { - self.stream + /// Deconstruct into the underlying stream and read buffer. + pub fn into_inner(self) -> (S, BytesMut) { + (self.stream, self.read_buf) } /// Return new Framed with stream type transformed by async f, for TLS diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index abeff6a33b..efe0e8795b 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -16,7 +16,7 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing::{error, info, warn}; use utils::measured_stream::MeasuredStream; @@ -209,9 +209,18 @@ async fn handshake( if let Some(tls) = tls.take() { // Upgrade raw stream into a secure TLS-backed stream. // NOTE: We've consumed `tls`; this fact will be used later. - stream = PqStream::new( - stream.into_inner().upgrade(tls.to_server_config()).await?, - ); + + let (raw, read_buf) = stream.into_inner(); + // TODO: Normally, client doesn't send any data before + // server says TLS handshake is ok and read_buf is empy. + // However, you could imagine pipelining of postgres + // SSLRequest + TLS ClientHello in one hunk similar to + // pipelining in our node js driver. We should probably + // support that by chaining read_buf with the stream. + if !read_buf.is_empty() { + bail!("data is sent before server replied with EncryptionResponse"); + } + stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?); } } _ => bail!(ERR_PROTO_VIOLATION), @@ -443,11 +452,17 @@ impl Client<'_, S> { value: mut node_info, } = auth_result; - let node = connect_to_compute(&mut node_info, params, &extra, &creds) + let mut node = connect_to_compute(&mut node_info, params, &extra, &creds) .or_else(|e| stream.throw_error(e)) .await?; prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?; - proxy_pass(stream.into_inner(), node.stream, &node_info.aux).await + // Before proxy passing, forward to compute whatever data is left in the + // PqStream input buffer. Normally there is none, but our serverless npm + // driver in pipeline mode sends startup, password and first query + // immediately after opening the connection. + let (stream, read_buf) = stream.into_inner(); + node.stream.write_all(&read_buf).await?; + proxy_pass(stream, node.stream, &node_info.aux).await } } diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index 9dfc435e39..7cb292ed58 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -1,5 +1,6 @@ use crate::error::UserFacingError; use anyhow::bail; +use bytes::BytesMut; use pin_project_lite::pin_project; use pq_proto::framed::{ConnectionError, Framed}; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError}; @@ -27,8 +28,8 @@ impl PqStream { } } - /// Extract the underlying stream. - pub fn into_inner(self) -> S { + /// Extract the underlying stream and read buffer. + pub fn into_inner(self) -> (S, BytesMut) { self.framed.into_inner() }