impl compute connect

This commit is contained in:
Folke Behrens
2024-09-13 09:49:11 +01:00
parent 34696381c5
commit e06aa4b91d

View File

@@ -1,6 +1,6 @@
use std::{
convert::Infallible,
net::{IpAddr, SocketAddr},
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
};
@@ -257,7 +257,7 @@ impl PglbConn<Start> {
.await?
.handle_auth_passthrough()
.await?
.handle_connect_connect()
.handle_compute_connect()
.await?
.handle_compute_passthrough()
.await
@@ -499,15 +499,85 @@ struct ComputeConnect {
}
impl PglbConn<ComputeConnect> {
async fn handle_connect_connect(self) -> Result<PglbConn<ComputePassthrough>> {
todo!()
async fn handle_compute_connect(self) -> Result<PglbConn<ComputePassthrough>> {
println!("connecting to compute...");
let ComputeConnect {
client_stream,
mut auth_stream,
compute_socket,
} = self.state;
let compute_stream = TcpStream::connect(compute_socket).await?;
println!("connected to compute");
match compute_stream.set_nodelay(true) {
Ok(()) => {}
Err(e) => {
bail!("socket option error: {e}");
}
};
let mut compute_stream = Framed::new(
compute_stream,
PgRawCodec {
start_or_ssl_request: true,
},
);
loop {
select! {
msg = auth_stream.next() => {
let Some(msg) = msg else {
bail!("auth proxy disconnected");
};
match msg? {
PglbMessage::Postgres(mut payload) => {
let Some(msg) = PgRawMessage::decode(&mut payload, false)? else {
bail!("auth proxy sent invalid message");
};
compute_stream.send(msg).await?;
}
PglbMessage::Control(PglbControlMessage::ConnectionInitiated(_)) => {
bail!("auth proxy sent unexpected message");
}
PglbMessage::Control(PglbControlMessage::ConnectToCompute { .. }) => {
bail!("auth proxy sent unexpected message");
}
PglbMessage::Control(PglbControlMessage::ComputeEstablish) => {
println!("establish");
return Ok(PglbConn {
inner: self.inner,
state: ComputePassthrough {
client_stream,
compute_stream,
},
});
}
}
}
msg = compute_stream.next() => {
let Some(msg) = msg else {
bail!("compute disconnected");
};
match msg? {
PgRawMessage::SslRequest => bail!("protocol violation"),
msg => {
let mut buf = BytesMut::new();
msg.encode(&mut buf)?;
auth_stream.send(proxy::PglbMessage::Postgres(
buf
)).await?;
}
}
}
}
}
}
}
#[derive(Debug)]
struct ComputePassthrough {
client_stream: TlsStream<TcpStream>,
compute_conn: (),
client_stream: Framed<TlsStream<TcpStream>, PgRawCodec>,
compute_stream: Framed<TcpStream, PgRawCodec>,
}
impl PglbConn<ComputePassthrough> {