mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Forward framed read buf contents to compute before proxy pass.
Otherwise they get lost. Normally buffer is empty before proxy pass, but this is
not the case with pipeline mode of out npm driver; fixes connection hangup
introduced by b80fe41af3 for it.
This commit is contained in:
@@ -63,9 +63,9 @@ impl<S> Framed<S> {
|
||||
&self.stream
|
||||
}
|
||||
|
||||
/// Extract the underlying stream.
|
||||
pub fn into_inner(self) -> S {
|
||||
self.stream
|
||||
/// Deconstruct into the underlying stream and read buffer.
|
||||
pub fn into_inner(self) -> (S, BytesMut) {
|
||||
(self.stream, self.read_buf)
|
||||
}
|
||||
|
||||
/// Return new Framed with stream type transformed by async f, for TLS
|
||||
|
||||
@@ -16,7 +16,7 @@ use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCou
|
||||
use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tracing::{error, info, warn};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
@@ -209,9 +209,10 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
if let Some(tls) = tls.take() {
|
||||
// Upgrade raw stream into a secure TLS-backed stream.
|
||||
// NOTE: We've consumed `tls`; this fact will be used later.
|
||||
stream = PqStream::new(
|
||||
stream.into_inner().upgrade(tls.to_server_config()).await?,
|
||||
);
|
||||
// Client shouldn't send any data before TLS handshake,
|
||||
// so ignore pqstream read buffer.
|
||||
let (raw, _) = stream.into_inner();
|
||||
stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?);
|
||||
}
|
||||
}
|
||||
_ => bail!(ERR_PROTO_VIOLATION),
|
||||
@@ -443,11 +444,17 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
value: mut node_info,
|
||||
} = auth_result;
|
||||
|
||||
let node = connect_to_compute(&mut node_info, params, &extra, &creds)
|
||||
let mut node = connect_to_compute(&mut node_info, params, &extra, &creds)
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
proxy_pass(stream.into_inner(), node.stream, &node_info.aux).await
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
// driver in pipeline mode sends startup, password and first query
|
||||
// immediately after opening the connection.
|
||||
let (stream, read_buf) = stream.into_inner();
|
||||
node.stream.write_all(&read_buf).await?;
|
||||
proxy_pass(stream, node.stream, &node_info.aux).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::error::UserFacingError;
|
||||
use anyhow::bail;
|
||||
use bytes::BytesMut;
|
||||
use pin_project_lite::pin_project;
|
||||
use pq_proto::framed::{ConnectionError, Framed};
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
|
||||
@@ -27,8 +28,8 @@ impl<S> PqStream<S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the underlying stream.
|
||||
pub fn into_inner(self) -> S {
|
||||
/// Extract the underlying stream and read buffer.
|
||||
pub fn into_inner(self) -> (S, BytesMut) {
|
||||
self.framed.into_inner()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user