mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
dummy proxy works
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
use regex::Regex;
|
||||
use rustls::{OwnedTrustAnchor, RootCertStore};
|
||||
use std::{net::TcpListener, sync::Arc};
|
||||
use std::convert::Infallible;
|
||||
use std::net::TcpListener;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use futures::{sink::SinkExt, stream::StreamExt};
|
||||
@@ -10,40 +9,45 @@ use hyper_tungstenite::HyperWebsocket;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpStream;
|
||||
use utils::http::{
|
||||
endpoint, error::ApiError, json::json_response, request::parse_request_param, RouterBuilder,
|
||||
RouterService,
|
||||
};
|
||||
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
|
||||
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::OK, "")
|
||||
}
|
||||
|
||||
async fn ws_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
// fetch query param
|
||||
let endpoint = request.uri().query().and_then(|v| {
|
||||
url::form_urlencoded::parse(v.as_bytes())
|
||||
.into_owned()
|
||||
.map(|(_name, value)| value)
|
||||
.next()
|
||||
});
|
||||
|
||||
// Check if the request is a websocket upgrade request.
|
||||
if hyper_tungstenite::is_upgrade_request(&request) {
|
||||
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
|
||||
.map_err(|e| ApiError::BadRequest(e.into()))?;
|
||||
|
||||
let connstr: String = parse_request_param(&request, "connstr")?;
|
||||
let r = Regex::new(r"@(.*:(\d+)").unwrap();
|
||||
let caps = r.captures(&connstr).unwrap();
|
||||
let endpoint: String = caps.get(1).unwrap().as_str().to_owned();
|
||||
// forward somewhere, if asked
|
||||
if let Some(endpoint) = endpoint {
|
||||
println!("forwarding to {:?}", endpoint);
|
||||
let neon_sock = TcpStream::connect(endpoint)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
// connect to itself, but through remote endpoint, lol
|
||||
let neon_sock = TcpStream::connect(endpoint)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
// Spawn a task to handle the websocket connection.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve_websocket(websocket, neon_sock).await {
|
||||
println!("Error in websocket connection: {:?}", e);
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn a task to handle the websocket connection.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve_websocket(websocket, neon_sock).await {
|
||||
eprintln!("Error in websocket connection: {}", e);
|
||||
}
|
||||
});
|
||||
// Return the response so the spawned future can continue.
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response)
|
||||
panic!("ffewfwe");
|
||||
} else {
|
||||
json_response(StatusCode::OK, "hi")
|
||||
}
|
||||
@@ -56,58 +60,63 @@ async fn serve_websocket(
|
||||
let mut websocket = websocket.await?;
|
||||
let mut buf = [0u8; 8192];
|
||||
|
||||
tokio::select! {
|
||||
Some(message) = websocket.next() => {
|
||||
match message? {
|
||||
Message::Text(msg) => {
|
||||
println!("Received text message: {}", msg);
|
||||
}
|
||||
Message::Binary(msg) => {
|
||||
println!("Received binary message: {:02X?}", msg);
|
||||
neon_sock.write_all(&msg).await?;
|
||||
websocket
|
||||
.send(Message::binary(b"Thank you, come again.".to_vec()))
|
||||
.await?;
|
||||
}
|
||||
Message::Ping(msg) => {
|
||||
// No need to send a reply: tungstenite takes care of this for you.
|
||||
println!("Received ping message: {:02X?}", msg);
|
||||
}
|
||||
Message::Pong(msg) => {
|
||||
println!("Received pong message: {:02X?}", msg);
|
||||
}
|
||||
Message::Close(msg) => {
|
||||
// No need to send a reply: tungstenite takes care of this for you.
|
||||
if let Some(msg) = &msg {
|
||||
println!(
|
||||
"Received close message with code {} and message: {}",
|
||||
msg.code, msg.reason
|
||||
);
|
||||
} else {
|
||||
println!("Received close message");
|
||||
println!("serving ws");
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(message) = websocket.next() => {
|
||||
match message? {
|
||||
Message::Text(msg) => {
|
||||
println!("Received text message: {}", msg);
|
||||
}
|
||||
Message::Binary(msg) => {
|
||||
println!("Received binary message: {:02X?}", msg);
|
||||
neon_sock.write_all(&msg).await?;
|
||||
neon_sock.flush().await?;
|
||||
println!("sent that binary msg");
|
||||
}
|
||||
Message::Ping(msg) => {
|
||||
// No need to send a reply: tungstenite takes care of this for you.
|
||||
println!("Received ping message: {:02X?}", msg);
|
||||
}
|
||||
Message::Pong(msg) => {
|
||||
println!("Received pong message: {:02X?}", msg);
|
||||
}
|
||||
Message::Close(msg) => {
|
||||
// No need to send a reply: tungstenite takes care of this for you.
|
||||
if let Some(msg) = &msg {
|
||||
println!(
|
||||
"Received close message with code {} and message: {}",
|
||||
msg.code, msg.reason
|
||||
);
|
||||
} else {
|
||||
println!("Received close message");
|
||||
}
|
||||
}
|
||||
Message::Frame(_msg) => {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
Message::Frame(_msg) => {
|
||||
unreachable!();
|
||||
},
|
||||
res = neon_sock.read(&mut buf) => {
|
||||
let res = res?;
|
||||
if res == 0 {
|
||||
anyhow::bail!("read 0 from pg");
|
||||
}
|
||||
println!("Received binary message from pg: {:02X?}", &buf[0..res]);
|
||||
websocket
|
||||
.send(Message::binary(&buf[0..res]))
|
||||
.await?;
|
||||
}
|
||||
},
|
||||
res = neon_sock.read(&mut buf) => {
|
||||
let res = res?;
|
||||
websocket
|
||||
.send(Message::binary(&buf[0..res]))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let router = endpoint::make_router();
|
||||
router
|
||||
.get("/", ws_handler)
|
||||
.get("/v1/status", status_handler)
|
||||
.get("/v1/ws", ws_handler)
|
||||
}
|
||||
|
||||
pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
|
||||
@@ -123,3 +132,16 @@ pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn ws_thread_main(ws_listener: TcpListener) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
println!("ws has shut down");
|
||||
}
|
||||
|
||||
hyper::Server::from_tcp(ws_listener)?
|
||||
.serve(hyper::service::make_service_fn(|_connection| async {
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(ws_handler))
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -147,6 +147,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
println!("Starting http on {}", http_address);
|
||||
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
|
||||
|
||||
let ws_address: SocketAddr = "0.0.0.0:8000".parse()?;
|
||||
println!("Starting ws on {}", ws_address);
|
||||
let ws_listener = TcpListener::bind(ws_address).await?.into_std()?;
|
||||
|
||||
println!("Starting mgmt on {}", mgmt_address);
|
||||
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
|
||||
|
||||
@@ -155,6 +159,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let tasks = [
|
||||
tokio::spawn(http::server::thread_main(http_listener)),
|
||||
tokio::spawn(http::server::ws_thread_main(ws_listener)),
|
||||
tokio::spawn(proxy::thread_main(config, proxy_listener)),
|
||||
tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)),
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user