mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
[proxy] Fix possible unsoundness in the websocket machinery (#3569)
This PR replaces the ill-advised `unsafe Sync` impl with a de-facto standard way to solve the underlying problem. TLDR: - tokio::task::spawn requires future to be Send - ∀t. (t : Sync) <=> (&t : Send) - ∀t. (t : Send + !Sync) => (&t : !Send)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2901,6 +2901,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"socket2",
|
||||
"sync_wrapper",
|
||||
"thiserror",
|
||||
"tls-listener",
|
||||
"tokio",
|
||||
|
||||
@@ -69,7 +69,6 @@ once_cell = "1.13"
|
||||
opentelemetry = "0.18.0"
|
||||
opentelemetry-otlp = { version = "0.11.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.10.0"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
parking_lot = "0.12"
|
||||
pin-project-lite = "0.2"
|
||||
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
||||
@@ -93,6 +92,7 @@ socket2 = "0.4.4"
|
||||
strum = "0.24"
|
||||
strum_macros = "0.24"
|
||||
svg_fmt = "0.4.1"
|
||||
sync_wrapper = "0.1.2"
|
||||
tar = "0.4"
|
||||
thiserror = "1.0"
|
||||
tls-listener = { version = "0.6", features = ["rustls", "hyper-h1"] }
|
||||
@@ -105,6 +105,7 @@ toml = "0.5"
|
||||
toml_edit = { version = "0.17", features = ["easy"] }
|
||||
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
|
||||
tracing = "0.1"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
url = "2.2"
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
|
||||
@@ -43,6 +43,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sha2.workspace = true
|
||||
socket2.workspace = true
|
||||
sync_wrapper.workspace = true
|
||||
thiserror.workspace = true
|
||||
tls-listener.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
|
||||
@@ -1,161 +1,136 @@
|
||||
use crate::{
|
||||
cancellation::CancelMap, config::ProxyConfig, error::io_error, proxy::handle_ws_client,
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use hyper::server::accept;
|
||||
use hyper::server::conn::AddrIncoming;
|
||||
use hyper::upgrade::Upgraded;
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use hyper_tungstenite::{tungstenite, WebSocketStream};
|
||||
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket};
|
||||
use hyper::{
|
||||
server::{accept, conn::AddrIncoming},
|
||||
upgrade::Upgraded,
|
||||
Body, Request, Response, StatusCode,
|
||||
};
|
||||
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
|
||||
use pin_project_lite::pin_project;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::future::ready;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{ready, Context, Poll};
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
future::ready,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{ready, Context, Poll},
|
||||
};
|
||||
use tls_listener::TlsListener;
|
||||
|
||||
use tokio::io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
|
||||
|
||||
use tokio::{
|
||||
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
|
||||
net::TcpListener,
|
||||
};
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
use crate::cancellation::CancelMap;
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::proxy::handle_ws_client;
|
||||
// TODO: use `std::sync::Exclusive` once it's stabilized.
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/98407.
|
||||
use sync_wrapper::SyncWrapper;
|
||||
|
||||
pin_project! {
|
||||
/// This is a wrapper around a WebSocketStream that implements AsyncRead and AsyncWrite.
|
||||
pub struct WebSocketRW {
|
||||
/// This is a wrapper around a [`WebSocketStream`] that
|
||||
/// implements [`AsyncRead`] and [`AsyncWrite`].
|
||||
pub struct WebSocketRw {
|
||||
#[pin]
|
||||
stream: WebSocketStream<Upgraded>,
|
||||
chunk: Option<bytes::Bytes>,
|
||||
stream: SyncWrapper<WebSocketStream<Upgraded>>,
|
||||
bytes: Bytes,
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: explain why this is safe or try to remove `unsafe impl`.
|
||||
unsafe impl Sync for WebSocketRW {}
|
||||
|
||||
impl WebSocketRW {
|
||||
impl WebSocketRw {
|
||||
pub fn new(stream: WebSocketStream<Upgraded>) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
chunk: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn has_chunk(&self) -> bool {
|
||||
if let Some(ref chunk) = self.chunk {
|
||||
chunk.remaining() > 0
|
||||
} else {
|
||||
false
|
||||
stream: stream.into(),
|
||||
bytes: Bytes::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ws_err_into(e: tungstenite::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e.to_string())
|
||||
}
|
||||
|
||||
impl AsyncWrite for WebSocketRW {
|
||||
impl AsyncWrite for WebSocketRw {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
let mut this = self.project();
|
||||
match this.stream.as_mut().poll_ready(cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
if let Err(e) = this
|
||||
.stream
|
||||
.as_mut()
|
||||
.start_send(Message::Binary(buf.to_vec()))
|
||||
{
|
||||
Poll::Ready(Err(ws_err_into(e)))
|
||||
} else {
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Err(ws_err_into(e))),
|
||||
Poll::Pending => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let mut stream = self.project().stream.get_pin_mut();
|
||||
|
||||
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
|
||||
match stream.as_mut().start_send(Message::Binary(buf.into())) {
|
||||
Ok(()) => Poll::Ready(Ok(buf.len())),
|
||||
Err(e) => Poll::Ready(Err(io_error(e))),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
self.project().stream.poll_flush(cx).map_err(ws_err_into)
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let stream = self.project().stream.get_pin_mut();
|
||||
stream.poll_flush(cx).map_err(io_error)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
self.project().stream.poll_close(cx).map_err(ws_err_into)
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
let stream = self.project().stream.get_pin_mut();
|
||||
stream.poll_close(cx).map_err(io_error)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for WebSocketRW {
|
||||
impl AsyncRead for WebSocketRw {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
if buf.remaining() == 0 {
|
||||
return Poll::Ready(Ok(()));
|
||||
if buf.remaining() > 0 {
|
||||
let bytes = ready!(self.as_mut().poll_fill_buf(cx))?;
|
||||
let len = std::cmp::min(bytes.len(), buf.remaining());
|
||||
buf.put_slice(&bytes[..len]);
|
||||
self.consume(len);
|
||||
}
|
||||
|
||||
let inner_buf = match ready!(self.as_mut().poll_fill_buf(cx)) {
|
||||
Ok(buf) => buf,
|
||||
Err(err) => return Poll::Ready(Err(err)),
|
||||
};
|
||||
let len = std::cmp::min(inner_buf.len(), buf.remaining());
|
||||
buf.put_slice(&inner_buf[..len]);
|
||||
|
||||
self.consume(len);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncBufRead for WebSocketRW {
|
||||
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
|
||||
impl AsyncBufRead for WebSocketRw {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
|
||||
// Please refer to poll_fill_buf's documentation.
|
||||
const EOF: Poll<io::Result<&[u8]>> = Poll::Ready(Ok(&[]));
|
||||
|
||||
let mut this = self.project();
|
||||
loop {
|
||||
if self.as_mut().has_chunk() {
|
||||
let buf = self.project().chunk.as_ref().unwrap().chunk();
|
||||
return Poll::Ready(Ok(buf));
|
||||
} else {
|
||||
match ready!(self.as_mut().project().stream.poll_next(cx)) {
|
||||
Some(Ok(message)) => match message {
|
||||
Message::Text(_) => {}
|
||||
Message::Binary(chunk) => {
|
||||
*self.as_mut().project().chunk = Some(Bytes::from(chunk));
|
||||
}
|
||||
Message::Ping(_) => {
|
||||
// No need to send a reply: tungstenite takes care of this for you.
|
||||
}
|
||||
Message::Pong(_) => {}
|
||||
Message::Close(_) => {
|
||||
// No need to send a reply: tungstenite takes care of this for you.
|
||||
return Poll::Ready(Ok(&[]));
|
||||
}
|
||||
Message::Frame(_) => {
|
||||
unreachable!();
|
||||
}
|
||||
},
|
||||
Some(Err(err)) => return Poll::Ready(Err(ws_err_into(err))),
|
||||
None => return Poll::Ready(Ok(&[])),
|
||||
}
|
||||
if !this.bytes.chunk().is_empty() {
|
||||
let chunk = (*this.bytes).chunk();
|
||||
return Poll::Ready(Ok(chunk));
|
||||
}
|
||||
|
||||
let res = ready!(this.stream.as_mut().get_pin_mut().poll_next(cx));
|
||||
match res.transpose().map_err(io_error)? {
|
||||
Some(message) => match message {
|
||||
Message::Ping(_) => {}
|
||||
Message::Pong(_) => {}
|
||||
Message::Text(text) => {
|
||||
// We expect to see only binary messages.
|
||||
let error = "unexpected text message in the websocket";
|
||||
warn!(length = text.len(), error);
|
||||
return Poll::Ready(Err(io_error(error)));
|
||||
}
|
||||
Message::Frame(_) => {
|
||||
// This case is impossible according to Frame's doc.
|
||||
panic!("unexpected raw frame in the websocket");
|
||||
}
|
||||
Message::Binary(chunk) => {
|
||||
assert!(this.bytes.is_empty());
|
||||
*this.bytes = Bytes::from(chunk);
|
||||
}
|
||||
Message::Close(_) => return EOF,
|
||||
},
|
||||
None => return EOF,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn consume(self: Pin<&mut Self>, amt: usize) {
|
||||
if amt > 0 {
|
||||
self.project()
|
||||
.chunk
|
||||
.as_mut()
|
||||
.expect("No chunk present")
|
||||
.advance(amt);
|
||||
}
|
||||
fn consume(self: Pin<&mut Self>, amount: usize) {
|
||||
self.project().bytes.advance(amount);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,7 +146,7 @@ async fn serve_websocket(
|
||||
config,
|
||||
cancel_map,
|
||||
session_id,
|
||||
WebSocketRW::new(websocket),
|
||||
WebSocketRw::new(websocket),
|
||||
hostname,
|
||||
)
|
||||
.await?;
|
||||
@@ -199,7 +174,7 @@ async fn ws_handler(
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host).await
|
||||
{
|
||||
error!("error in websocket connection: {:?}", e);
|
||||
error!("error in websocket connection: {e:?}");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -231,7 +206,7 @@ pub async fn task_main(
|
||||
|
||||
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
|
||||
if let Err(err) = conn {
|
||||
error!("failed to accept TLS connection for websockets: {:?}", err);
|
||||
error!("failed to accept TLS connection for websockets: {err:?}");
|
||||
ready(false)
|
||||
} else {
|
||||
ready(true)
|
||||
|
||||
Reference in New Issue
Block a user