Add SQL-over-HTTP endpoint to Proxy

This commit introduces an SQL-over-HTTP endpoint in the proxy, with a JSON
response structure resembling that of the node-postgres driver. This method,
using HTTP POST, achieves smaller amortized latencies in edge setups due to
fewer round trips and an enhanced open connection reuse by the v8 engine.

This update involves several intricacies:
1. SQL injection protection: We employed the extended query protocol, modifying
   the rust-postgres driver to send queries in one roundtrip using a text
   protocol rather than binary, bypassing potential issues like those identified
   in https://github.com/sfackler/rust-postgres/issues/1030.

2. Postgres type compatibility: As not all postgres types have binary
   representations (e.g., acl's in pg_class), we adjusted rust-postgres to
   respond with text protocol, simplifying serialization and fixing queries with
   text-only types in response.

3. Data type conversion: Considering JSON supports fewer data types than
   Postgres, we perform conversions where possible, passing all other types as
   strings. Key conversions include:
   - postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
     text)
   - postgres bool, null, text -> json bool, null, string
   - postgres array -> json array
   - postgres json and jsonb -> json object

4. Alignment with node-postgres: To facilitate integration with js libraries,
   we've matched the response structure of node-postgres, returning command tags
   and column oids. Command tag capturing was added to the rust-postgres
   functionality as part of this change.
This commit is contained in:
Stas Kelvich
2023-04-14 19:41:02 +03:00
committed by Arthur Petukhovsky
parent d75b4e0f16
commit dad3519351
10 changed files with 909 additions and 65 deletions

View File

@@ -1,6 +1,6 @@
# Proxy
Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following backends are currently implemented:
Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following routing backends are currently implemented:
* console
new SCRAM-based console API; uses SNI info to select the destination project (endpoint soon)
@@ -9,6 +9,90 @@ Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme a
* link
sends login link for all usernames
Also proxy can expose following services to the external world:
* postgres protocol over TCP -- usual postgres endpoint compatible with usual
postgres drivers
* postgres protocol over WebSockets -- same protocol tunneled over websockets
for environments where TCP connection is not available. We have our own
implementation of a client that uses node-postgres and tunnels traffic through
websockets: https://github.com/neondatabase/serverless
* SQL over HTTP -- service that accepts POST requests with SQL text over HTTP
and responds with JSON-serialised results.
## SQL over HTTP
Contrary to the usual postgres proto over TCP and WebSockets using plain
one-shot HTTP request achieves smaller amortized latencies in edge setups due to
fewer round trips and an enhanced open connection reuse by the v8 engine. Also
such endpoint could be used directly without any driver.
To play with it locally one may start proxy over a local postgres installation
(see end of this page on how to generate certs with openssl):
```
./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444
```
If both postgres and proxy are running you may send a SQL query:
```json
curl -k -X POST 'https://proxy.localtest.me:4444/sql' \
-H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \
-H 'Content-Type: application/json' \
--data '{
"query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num",
"params":[ "{{1,2},{\"3\",4}}", {"key":"val", "ikey":4242}]
}' | jq
{
"command": "SELECT",
"fields": [
{ "dataTypeID": 1007, "name": "arr" },
{ "dataTypeID": 3802, "name": "obj" },
{ "dataTypeID": 23, "name": "num" }
],
"rowCount": 1,
"rows": [
{
"arr": [[1,2],[3,4]],
"num": 42,
"obj": {
"ikey": 4242,
"key": "val"
}
}
]
}
```
With the current approach we made the following design decisions:
1. SQL injection protection: We employed the extended query protocol, modifying
the rust-postgres driver to send queries in one roundtrip using a text
protocol rather than binary, bypassing potential issues like those identified
in sfackler/rust-postgres#1030.
2. Postgres type compatibility: As not all postgres types have binary
representations (e.g., acl's in pg_class), we adjusted rust-postgres to
respond with text protocol, simplifying serialization and fixing queries with
text-only types in response.
3. Data type conversion: Considering JSON supports fewer data types than
Postgres, we perform conversions where possible, passing all other types as
strings. Key conversions include:
- postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
text)
- postgres bool, null, text -> json bool, null, string
- postgres array -> json array
- postgres json and jsonb -> json object
4. Alignment with node-postgres: To facilitate integration with js libraries,
we've matched the response structure of node-postgres, returning command tags
and column oids. Command tag capturing was added to the rust-postgres
functionality as part of this change.
## Using SNI-based routing on localhost
Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:

View File

@@ -100,9 +100,10 @@ impl CertResolver {
is_default: bool,
) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
let key_bytes = std::fs::read(key_path)
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to parse TLS keys at '{key_path}'"))?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()

View File

@@ -3,6 +3,7 @@
//! directly relying on deps like `reqwest` (think loose coupling).
pub mod server;
pub mod sql_over_http;
pub mod websocket;
pub use reqwest::{Request, Response, StatusCode};

View File

@@ -0,0 +1,603 @@
use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
use hyper::{Body, HeaderMap, Request};
use pq_proto::StartupMessageParams;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use url::Url;
use crate::{auth, config::ProxyConfig, console};
#[derive(serde::Deserialize)]
struct QueryData {
query: String,
params: Vec<serde_json::Value>,
}
const APP_NAME: &str = "sql_over_http";
const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
//
// Convert json non-string types to strings, so that they can be passed to Postgres
// as parameters.
//
fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<String>, serde_json::Error> {
json.iter()
.map(|value| {
match value {
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// no need to escape
Value::String(s) => Ok(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
})
.collect()
}
//
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
// in the array we need to escape the strings. Postgres is okay with arrays of form
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
// it for Postgres to check.
//
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
//
fn json_array_to_pg_array(value: &Value) -> Result<String, serde_json::Error> {
match value {
// same
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// now needs to be escaped, as it is part of the array
Value::String(_) => serde_json::to_string(value),
// recurse into array
Value::Array(arr) => {
let vals = arr
.iter()
.map(json_array_to_pg_array)
.collect::<Result<Vec<_>, _>>()?
.join(",");
Ok(format!("{{{}}}", vals))
}
}
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
) -> Result<(String, String, String, String), anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
.ok_or(anyhow::anyhow!("missing connection string"))?
.to_str()?;
let connection_url = Url::parse(connection_string)?;
let protocol = connection_url.scheme();
if protocol != "postgres" && protocol != "postgresql" {
return Err(anyhow::anyhow!(
"connection string must start with postgres: or postgresql:"
));
}
let mut url_path = connection_url
.path_segments()
.ok_or(anyhow::anyhow!("missing database name"))?;
let dbname = url_path
.next()
.ok_or(anyhow::anyhow!("invalid database name"))?;
let username = connection_url.username();
if username.is_empty() {
return Err(anyhow::anyhow!("missing username"));
}
let password = connection_url
.password()
.ok_or(anyhow::anyhow!("no password"))?;
// TLS certificate selector now based on SNI hostname, so if we are running here
// we are sure that SNI hostname is set to one of the configured domain names.
let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?;
let hostname = connection_url
.host_str()
.ok_or(anyhow::anyhow!("no host"))?;
let host_header = headers
.get("host")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split(':').next());
if hostname != sni_hostname {
return Err(anyhow::anyhow!("mismatched SNI hostname and hostname"));
} else if let Some(h) = host_header {
if h != hostname {
return Err(anyhow::anyhow!("mismatched host header and hostname"));
}
}
Ok((
username.to_owned(),
dbname.to_owned(),
hostname.to_owned(),
password.to_owned(),
))
}
// TODO: return different http error codes
pub async fn handle(
config: &'static ProxyConfig,
request: Request<Body>,
sni_hostname: Option<String>,
) -> anyhow::Result<Value> {
//
// Determine the destination and connection params
//
let headers = request.headers();
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &username),
("database", &dbname),
("application_name", APP_NAME),
]);
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
//
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let creds = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => MAX_REQUEST_SIZE + 1,
};
if request_content_length > MAX_REQUEST_SIZE {
return Err(anyhow::anyhow!(
"request is too large (max {MAX_REQUEST_SIZE} bytes)"
));
}
//
// Read the query and query params from the request body
//
let body = hyper::body::to_bytes(request.into_body()).await?;
let QueryData { query, params } = serde_json::from_slice(&body)?;
let query_params = json_to_pg_text(params)?;
//
// Connenct to the destination
//
let (client, connection) = tokio_postgres::Config::new()
.host(host)
.port(port)
.user(&username)
.password(&password)
.dbname(&dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
//
// Now execute the query and return the result
//
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
let mut curret_size = 0;
while let Some(row) = row_stream.next().await {
let row = row?;
curret_size += row.body_len();
rows.push(row);
if curret_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!("response too large"));
}
}
// grab the command tag and number of rows affected
let command_tag = row_stream.command_tag().unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
// INSERT returns OID first and then number of rows
command_tag_split.nth(1)
} else {
// other commands return number of rows (if any)
command_tag_split.next()
}
.and_then(|s| s.parse::<i64>().ok());
let fields = if !rows.is_empty() {
rows[0]
.columns()
.iter()
.map(|c| {
json!({
"name": Value::String(c.name().to_owned()),
"dataTypeID": Value::Number(c.type_().oid().into()),
})
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
// convert rows to JSON
let rows = rows
.iter()
.map(pg_text_row_to_json)
.collect::<Result<Vec<_>, _>>()?;
// resulting JSON format is based on the format of node-postgres result
Ok(json!({
"command": command_tag_name,
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
}))
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub fn pg_text_row_to_json(row: &Row) -> Result<Value, anyhow::Error> {
let res = row
.columns()
.iter()
.enumerate()
.map(|(i, column)| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = pg_text_to_json(pg_value, column.type_())?;
Ok((name.to_string(), json_value))
})
.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
Ok(Value::Object(res))
}
//
// Convert postgres text-encoded value to JSON value
//
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
if let Some(val) = pg_value {
if val == "NULL" {
return Ok(Value::Null);
}
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
}
fn _pg_array_parse(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), anyhow::Error> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
if c == '\\' {
escaped = true;
(i, c) = pg_array_chr.next().unwrap();
}
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' => {
level -= 1;
if level == 0 {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
}
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// push even if empty
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
quote = !quote;
}
',' if !quote => {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
}
_ => {
entry.push(c);
}
}
}
if level != 0 {
return Err(anyhow::anyhow!("unbalanced array"));
}
Ok((Value::Array(entries), 0))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["true", "false"]);
let json = vec![Value::Number(serde_json::Number::from(42))];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["42"]);
let json = vec![Value::String("foo\"".to_string())];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["foo\""]);
let json = vec![Value::Null];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["null"]);
}
#[test]
fn test_json_array_to_pg_array() {
// atoms and escaping
let json = "[true, false, null, 42, \"foo\", \"bar\\\"-\\\\\"]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{true,false,null,42,\"foo\",\"bar\\\"-\\\\\"}"]
);
// nested arrays
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{{true,false},{null,42},{\"foo\",\"bar\\\"-\\\\\"}}"]
);
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
json!("Infinity")
);
assert_eq!(
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
json!("-Infinity")
);
let json: Value =
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
.unwrap();
assert_eq!(
pg_text_to_json(
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
&Type::JSONB
)
.unwrap(),
json
);
}
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
json!(["aa\"\\,a", "cha", "bbbb"])
);
assert_eq!(
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
json!([["foo", "bar"], ["bee", "bop"]])
);
assert_eq!(
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
json!([[[["foo", null, "bop", "bup"]]]])
);
assert_eq!(
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
);
}
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
assert_eq!(
pb(r#"{{t,f},{f,t}}"#),
json!([[true, false], [false, true]])
);
assert_eq!(
pb(r#"{{t,NULL},{NULL,f}}"#),
json!([[true, null], [null, false]])
);
}
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty).unwrap()
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
json!(["NaN", "Infinity", "-Infinity"])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
json!(["NaN", "Infinity", "-Infinity"])
);
}
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2).unwrap()
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
}

View File

@@ -4,12 +4,17 @@ use crate::{
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use hyper::{
server::{accept, conn::AddrIncoming},
server::{
accept,
conn::{AddrIncoming, AddrStream},
},
upgrade::Upgraded,
Body, Request, Response, StatusCode,
Body, Method, Request, Response, StatusCode,
};
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
use pin_project_lite::pin_project;
use serde_json::{json, Value};
use std::{
convert::Infallible,
future::ready,
@@ -21,6 +26,7 @@ use tls_listener::TlsListener;
use tokio::{
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
net::TcpListener,
select,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
@@ -30,6 +36,8 @@ use utils::http::{error::ApiError, json::json_response};
// Tracking issue: https://github.com/rust-lang/rust/issues/98407.
use sync_wrapper::SyncWrapper;
use super::sql_over_http;
pin_project! {
/// This is a wrapper around a [`WebSocketStream`] that
/// implements [`AsyncRead`] and [`AsyncWrite`].
@@ -159,6 +167,7 @@ async fn ws_handler(
config: &'static ProxyConfig,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
sni_hostname: Option<String>,
) -> Result<Response<Body>, ApiError> {
let host = request
.headers()
@@ -181,8 +190,44 @@ async fn ws_handler(
// Return the response so the spawned future can continue.
Ok(response)
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
Err(anyhow::anyhow!("Query timed out"))
}
response = sql_over_http::handle(config, request, sni_hostname) => {
response
}
};
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,
};
let json = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
},
None => Value::Null,
};
json!({ "message": message, "code": code })
}
};
json_response(status_code, json).map(|mut r| {
r.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
r
})
} else {
json_response(StatusCode::OK, "Connect with a websocket client")
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}
}
@@ -216,20 +261,27 @@ pub async fn task_main(
}
});
let make_svc = hyper::service::make_service_fn(|_stream| async move {
Ok::<_, Infallible>(hyper::service::service_fn(
move |req: Request<Body>| async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, cancel_map, session_id)
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")
))
.await
},
))
});
let make_svc =
hyper::service::make_service_fn(|stream: &tokio_rustls::server::TlsStream<AddrStream>| {
let sni_name = stream.get_ref().1.sni_hostname().map(|s| s.to_string());
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = format_args!("{session_id}")
))
.await
}
}))
}
});
hyper::Server::builder(accept::from_stream(tls_listener))
.serve(make_svc)