From 7ccaf69e144dc853c30dbf75bad4d351c37c45cf Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 30 Sep 2022 11:04:13 +0300 Subject: [PATCH] dummy proxy works --- proxy/src/http/server.rs | 148 ++++++++++++++++++++++----------------- proxy/src/main.rs | 5 ++ 2 files changed, 90 insertions(+), 63 deletions(-) diff --git a/proxy/src/http/server.rs b/proxy/src/http/server.rs index 06b7c73789..a5e1c42a9b 100644 --- a/proxy/src/http/server.rs +++ b/proxy/src/http/server.rs @@ -1,6 +1,5 @@ -use regex::Regex; -use rustls::{OwnedTrustAnchor, RootCertStore}; -use std::{net::TcpListener, sync::Arc}; +use std::convert::Infallible; +use std::net::TcpListener; use anyhow::anyhow; use futures::{sink::SinkExt, stream::StreamExt}; @@ -10,40 +9,45 @@ use hyper_tungstenite::HyperWebsocket; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -use utils::http::{ - endpoint, error::ApiError, json::json_response, request::parse_request_param, RouterBuilder, - RouterService, -}; +use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService}; async fn status_handler(_: Request) -> Result, ApiError> { json_response(StatusCode::OK, "") } async fn ws_handler(mut request: Request) -> Result, ApiError> { + // fetch query param + let endpoint = request.uri().query().and_then(|v| { + url::form_urlencoded::parse(v.as_bytes()) + .into_owned() + .map(|(_name, value)| value) + .next() + }); + // Check if the request is a websocket upgrade request. if hyper_tungstenite::is_upgrade_request(&request) { let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None) .map_err(|e| ApiError::BadRequest(e.into()))?; - let connstr: String = parse_request_param(&request, "connstr")?; - let r = Regex::new(r"@(.*:(\d+)").unwrap(); - let caps = r.captures(&connstr).unwrap(); - let endpoint: String = caps.get(1).unwrap().as_str().to_owned(); + // forward somewhere, if asked + if let Some(endpoint) = endpoint { + println!("forwarding to {:?}", endpoint); + let neon_sock = TcpStream::connect(endpoint) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; - // connect to itself, but through remote endpoint, lol - let neon_sock = TcpStream::connect(endpoint) - .await - .map_err(|e| ApiError::InternalServerError(e.into()))?; + // Spawn a task to handle the websocket connection. + tokio::spawn(async move { + if let Err(e) = serve_websocket(websocket, neon_sock).await { + println!("Error in websocket connection: {:?}", e); + } + }); - // Spawn a task to handle the websocket connection. - tokio::spawn(async move { - if let Err(e) = serve_websocket(websocket, neon_sock).await { - eprintln!("Error in websocket connection: {}", e); - } - }); + // Return the response so the spawned future can continue. + return Ok(response); + } - // Return the response so the spawned future can continue. - Ok(response) + panic!("ffewfwe"); } else { json_response(StatusCode::OK, "hi") } @@ -56,58 +60,63 @@ async fn serve_websocket( let mut websocket = websocket.await?; let mut buf = [0u8; 8192]; - tokio::select! { - Some(message) = websocket.next() => { - match message? { - Message::Text(msg) => { - println!("Received text message: {}", msg); - } - Message::Binary(msg) => { - println!("Received binary message: {:02X?}", msg); - neon_sock.write_all(&msg).await?; - websocket - .send(Message::binary(b"Thank you, come again.".to_vec())) - .await?; - } - Message::Ping(msg) => { - // No need to send a reply: tungstenite takes care of this for you. - println!("Received ping message: {:02X?}", msg); - } - Message::Pong(msg) => { - println!("Received pong message: {:02X?}", msg); - } - Message::Close(msg) => { - // No need to send a reply: tungstenite takes care of this for you. - if let Some(msg) = &msg { - println!( - "Received close message with code {} and message: {}", - msg.code, msg.reason - ); - } else { - println!("Received close message"); + println!("serving ws"); + + loop { + tokio::select! { + Some(message) = websocket.next() => { + match message? { + Message::Text(msg) => { + println!("Received text message: {}", msg); + } + Message::Binary(msg) => { + println!("Received binary message: {:02X?}", msg); + neon_sock.write_all(&msg).await?; + neon_sock.flush().await?; + println!("sent that binary msg"); + } + Message::Ping(msg) => { + // No need to send a reply: tungstenite takes care of this for you. + println!("Received ping message: {:02X?}", msg); + } + Message::Pong(msg) => { + println!("Received pong message: {:02X?}", msg); + } + Message::Close(msg) => { + // No need to send a reply: tungstenite takes care of this for you. + if let Some(msg) = &msg { + println!( + "Received close message with code {} and message: {}", + msg.code, msg.reason + ); + } else { + println!("Received close message"); + } + } + Message::Frame(_msg) => { + unreachable!(); } } - Message::Frame(_msg) => { - unreachable!(); + }, + res = neon_sock.read(&mut buf) => { + let res = res?; + if res == 0 { + anyhow::bail!("read 0 from pg"); } + println!("Received binary message from pg: {:02X?}", &buf[0..res]); + websocket + .send(Message::binary(&buf[0..res])) + .await?; } - }, - res = neon_sock.read(&mut buf) => { - let res = res?; - websocket - .send(Message::binary(&buf[0..res])) - .await?; } } - - Ok(()) } fn make_router() -> RouterBuilder { let router = endpoint::make_router(); router + .get("/", ws_handler) .get("/v1/status", status_handler) - .get("/v1/ws", ws_handler) } pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> { @@ -123,3 +132,16 @@ pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> { Ok(()) } + +pub async fn ws_thread_main(ws_listener: TcpListener) -> anyhow::Result<()> { + scopeguard::defer! { + println!("ws has shut down"); + } + + hyper::Server::from_tcp(ws_listener)? + .serve(hyper::service::make_service_fn(|_connection| async { + Ok::<_, Infallible>(hyper::service::service_fn(ws_handler)) + })) + .await?; + Ok(()) +} diff --git a/proxy/src/main.rs b/proxy/src/main.rs index f2dc7425ba..6d92d22e85 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -147,6 +147,10 @@ async fn main() -> anyhow::Result<()> { println!("Starting http on {}", http_address); let http_listener = TcpListener::bind(http_address).await?.into_std()?; + let ws_address: SocketAddr = "0.0.0.0:8000".parse()?; + println!("Starting ws on {}", ws_address); + let ws_listener = TcpListener::bind(ws_address).await?.into_std()?; + println!("Starting mgmt on {}", mgmt_address); let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?; @@ -155,6 +159,7 @@ async fn main() -> anyhow::Result<()> { let tasks = [ tokio::spawn(http::server::thread_main(http_listener)), + tokio::spawn(http::server::ws_thread_main(ws_listener)), tokio::spawn(proxy::thread_main(config, proxy_listener)), tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)), ]