mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-10 03:40:37 +00:00
Compare commits
2 Commits
persistent
...
asher/ws2s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ccaf69e14 | ||
|
|
fcc6d52221 |
68
Cargo.lock
generated
68
Cargo.lock
generated
@@ -1448,6 +1448,19 @@ dependencies = [
|
|||||||
"tokio-native-tls",
|
"tokio-native-tls",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hyper-tungstenite"
|
||||||
|
version = "0.8.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "36692e7f740cd10fbe3f84f7cb7bfec2a71f929e72f97c19824d3f7f45aeec9b"
|
||||||
|
dependencies = [
|
||||||
|
"hyper",
|
||||||
|
"pin-project",
|
||||||
|
"tokio",
|
||||||
|
"tokio-tungstenite",
|
||||||
|
"tungstenite",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ident_case"
|
name = "ident_case"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
@@ -2381,6 +2394,7 @@ dependencies = [
|
|||||||
"hex",
|
"hex",
|
||||||
"hmac 0.12.1",
|
"hmac 0.12.1",
|
||||||
"hyper",
|
"hyper",
|
||||||
|
"hyper-tungstenite",
|
||||||
"itertools",
|
"itertools",
|
||||||
"md5",
|
"md5",
|
||||||
"metrics",
|
"metrics",
|
||||||
@@ -2389,6 +2403,7 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rand",
|
"rand",
|
||||||
"rcgen",
|
"rcgen",
|
||||||
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"routerify",
|
"routerify",
|
||||||
"rstest",
|
"rstest",
|
||||||
@@ -2407,6 +2422,7 @@ dependencies = [
|
|||||||
"url",
|
"url",
|
||||||
"utils",
|
"utils",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
"webpki-roots",
|
||||||
"workspace_hack",
|
"workspace_hack",
|
||||||
"x509-parser",
|
"x509-parser",
|
||||||
]
|
]
|
||||||
@@ -3060,6 +3076,17 @@ dependencies = [
|
|||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sha-1"
|
||||||
|
version = "0.10.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"cpufeatures",
|
||||||
|
"digest 0.10.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha2"
|
name = "sha2"
|
||||||
version = "0.9.9"
|
version = "0.9.9"
|
||||||
@@ -3534,6 +3561,18 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-tungstenite"
|
||||||
|
version = "0.17.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"tungstenite",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.7.3"
|
version = "0.7.3"
|
||||||
@@ -3745,6 +3784,25 @@ version = "0.2.3"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
|
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.17.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"byteorder",
|
||||||
|
"bytes",
|
||||||
|
"http",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"rand",
|
||||||
|
"sha-1",
|
||||||
|
"thiserror",
|
||||||
|
"url",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "typenum"
|
name = "typenum"
|
||||||
version = "1.15.0"
|
version = "1.15.0"
|
||||||
@@ -3808,6 +3866,12 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "utf-8"
|
||||||
|
version = "0.7.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "utils"
|
name = "utils"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -4032,9 +4096,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "webpki-roots"
|
name = "webpki-roots"
|
||||||
version = "0.22.4"
|
version = "0.22.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf"
|
checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"webpki",
|
"webpki",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -16,12 +16,14 @@ hashbrown = "0.12"
|
|||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
hmac = "0.12.1"
|
hmac = "0.12.1"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
|
hyper-tungstenite = "0.8.1"
|
||||||
itertools = "0.10.3"
|
itertools = "0.10.3"
|
||||||
md5 = "0.7.0"
|
md5 = "0.7.0"
|
||||||
once_cell = "1.13.0"
|
once_cell = "1.13.0"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
pin-project-lite = "0.2.7"
|
pin-project-lite = "0.2.7"
|
||||||
rand = "0.8.3"
|
rand = "0.8.3"
|
||||||
|
regex = "1.4.5"
|
||||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||||
routerify = "3"
|
routerify = "3"
|
||||||
rustls = "0.20.0"
|
rustls = "0.20.0"
|
||||||
@@ -37,6 +39,7 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
|
|||||||
tokio-rustls = "0.23.0"
|
tokio-rustls = "0.23.0"
|
||||||
url = "2.2.2"
|
url = "2.2.2"
|
||||||
uuid = { version = "0.8.2", features = ["v4", "serde"]}
|
uuid = { version = "0.8.2", features = ["v4", "serde"]}
|
||||||
|
webpki-roots = "0.22.5"
|
||||||
x509-parser = "0.13.2"
|
x509-parser = "0.13.2"
|
||||||
|
|
||||||
utils = { path = "../libs/utils" }
|
utils = { path = "../libs/utils" }
|
||||||
|
|||||||
@@ -1,15 +1,122 @@
|
|||||||
use anyhow::anyhow;
|
use std::convert::Infallible;
|
||||||
use hyper::{Body, Request, Response, StatusCode};
|
|
||||||
use std::net::TcpListener;
|
use std::net::TcpListener;
|
||||||
|
|
||||||
|
use anyhow::anyhow;
|
||||||
|
use futures::{sink::SinkExt, stream::StreamExt};
|
||||||
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
use hyper_tungstenite::tungstenite::Message;
|
||||||
|
use hyper_tungstenite::HyperWebsocket;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
|
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
|
||||||
|
|
||||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
json_response(StatusCode::OK, "")
|
json_response(StatusCode::OK, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn ws_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||||
|
// fetch query param
|
||||||
|
let endpoint = request.uri().query().and_then(|v| {
|
||||||
|
url::form_urlencoded::parse(v.as_bytes())
|
||||||
|
.into_owned()
|
||||||
|
.map(|(_name, value)| value)
|
||||||
|
.next()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check if the request is a websocket upgrade request.
|
||||||
|
if hyper_tungstenite::is_upgrade_request(&request) {
|
||||||
|
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
|
||||||
|
.map_err(|e| ApiError::BadRequest(e.into()))?;
|
||||||
|
|
||||||
|
// forward somewhere, if asked
|
||||||
|
if let Some(endpoint) = endpoint {
|
||||||
|
println!("forwarding to {:?}", endpoint);
|
||||||
|
let neon_sock = TcpStream::connect(endpoint)
|
||||||
|
.await
|
||||||
|
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||||
|
|
||||||
|
// Spawn a task to handle the websocket connection.
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = serve_websocket(websocket, neon_sock).await {
|
||||||
|
println!("Error in websocket connection: {:?}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Return the response so the spawned future can continue.
|
||||||
|
return Ok(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
panic!("ffewfwe");
|
||||||
|
} else {
|
||||||
|
json_response(StatusCode::OK, "hi")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn serve_websocket(
|
||||||
|
websocket: HyperWebsocket,
|
||||||
|
mut neon_sock: TcpStream,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut websocket = websocket.await?;
|
||||||
|
let mut buf = [0u8; 8192];
|
||||||
|
|
||||||
|
println!("serving ws");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(message) = websocket.next() => {
|
||||||
|
match message? {
|
||||||
|
Message::Text(msg) => {
|
||||||
|
println!("Received text message: {}", msg);
|
||||||
|
}
|
||||||
|
Message::Binary(msg) => {
|
||||||
|
println!("Received binary message: {:02X?}", msg);
|
||||||
|
neon_sock.write_all(&msg).await?;
|
||||||
|
neon_sock.flush().await?;
|
||||||
|
println!("sent that binary msg");
|
||||||
|
}
|
||||||
|
Message::Ping(msg) => {
|
||||||
|
// No need to send a reply: tungstenite takes care of this for you.
|
||||||
|
println!("Received ping message: {:02X?}", msg);
|
||||||
|
}
|
||||||
|
Message::Pong(msg) => {
|
||||||
|
println!("Received pong message: {:02X?}", msg);
|
||||||
|
}
|
||||||
|
Message::Close(msg) => {
|
||||||
|
// No need to send a reply: tungstenite takes care of this for you.
|
||||||
|
if let Some(msg) = &msg {
|
||||||
|
println!(
|
||||||
|
"Received close message with code {} and message: {}",
|
||||||
|
msg.code, msg.reason
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!("Received close message");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Message::Frame(_msg) => {
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
res = neon_sock.read(&mut buf) => {
|
||||||
|
let res = res?;
|
||||||
|
if res == 0 {
|
||||||
|
anyhow::bail!("read 0 from pg");
|
||||||
|
}
|
||||||
|
println!("Received binary message from pg: {:02X?}", &buf[0..res]);
|
||||||
|
websocket
|
||||||
|
.send(Message::binary(&buf[0..res]))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
||||||
let router = endpoint::make_router();
|
let router = endpoint::make_router();
|
||||||
router.get("/v1/status", status_handler)
|
router
|
||||||
|
.get("/", ws_handler)
|
||||||
|
.get("/v1/status", status_handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
|
pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
|
||||||
@@ -25,3 +132,16 @@ pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn ws_thread_main(ws_listener: TcpListener) -> anyhow::Result<()> {
|
||||||
|
scopeguard::defer! {
|
||||||
|
println!("ws has shut down");
|
||||||
|
}
|
||||||
|
|
||||||
|
hyper::Server::from_tcp(ws_listener)?
|
||||||
|
.serve(hyper::service::make_service_fn(|_connection| async {
|
||||||
|
Ok::<_, Infallible>(hyper::service::service_fn(ws_handler))
|
||||||
|
}))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@@ -147,6 +147,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
println!("Starting http on {}", http_address);
|
println!("Starting http on {}", http_address);
|
||||||
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
|
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
|
||||||
|
|
||||||
|
let ws_address: SocketAddr = "0.0.0.0:8000".parse()?;
|
||||||
|
println!("Starting ws on {}", ws_address);
|
||||||
|
let ws_listener = TcpListener::bind(ws_address).await?.into_std()?;
|
||||||
|
|
||||||
println!("Starting mgmt on {}", mgmt_address);
|
println!("Starting mgmt on {}", mgmt_address);
|
||||||
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
|
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
|
||||||
|
|
||||||
@@ -155,6 +159,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let tasks = [
|
let tasks = [
|
||||||
tokio::spawn(http::server::thread_main(http_listener)),
|
tokio::spawn(http::server::thread_main(http_listener)),
|
||||||
|
tokio::spawn(http::server::ws_thread_main(ws_listener)),
|
||||||
tokio::spawn(proxy::thread_main(config, proxy_listener)),
|
tokio::spawn(proxy::thread_main(config, proxy_listener)),
|
||||||
tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)),
|
tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)),
|
||||||
]
|
]
|
||||||
|
|||||||
Reference in New Issue
Block a user