Files
neon/proxy/src/http/sql_over_http.rs
Stas Kelvich dad3519351 Add SQL-over-HTTP endpoint to Proxy
This commit introduces an SQL-over-HTTP endpoint in the proxy, with a JSON
response structure resembling that of the node-postgres driver. This method,
using HTTP POST, achieves smaller amortized latencies in edge setups due to
fewer round trips and an enhanced open connection reuse by the v8 engine.

This update involves several intricacies:
1. SQL injection protection: We employed the extended query protocol, modifying
   the rust-postgres driver to send queries in one roundtrip using a text
   protocol rather than binary, bypassing potential issues like those identified
   in https://github.com/sfackler/rust-postgres/issues/1030.

2. Postgres type compatibility: As not all postgres types have binary
   representations (e.g., acl's in pg_class), we adjusted rust-postgres to
   respond with text protocol, simplifying serialization and fixing queries with
   text-only types in response.

3. Data type conversion: Considering JSON supports fewer data types than
   Postgres, we perform conversions where possible, passing all other types as
   strings. Key conversions include:
   - postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
     text)
   - postgres bool, null, text -> json bool, null, string
   - postgres array -> json array
   - postgres json and jsonb -> json object

4. Alignment with node-postgres: To facilitate integration with js libraries,
   we've matched the response structure of node-postgres, returning command tags
   and column oids. Command tag capturing was added to the rust-postgres
   functionality as part of this change.
2023-05-23 20:01:40 +03:00

604 lines
19 KiB
Rust

use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
use hyper::{Body, HeaderMap, Request};
use pq_proto::StartupMessageParams;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use tokio_postgres::types::Kind;
use tokio_postgres::types::Type;
use tokio_postgres::Row;
use url::Url;
use crate::{auth, config::ProxyConfig, console};
#[derive(serde::Deserialize)]
struct QueryData {
query: String,
params: Vec<serde_json::Value>,
}
const APP_NAME: &str = "sql_over_http";
const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
//
// Convert json non-string types to strings, so that they can be passed to Postgres
// as parameters.
//
fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<String>, serde_json::Error> {
json.iter()
.map(|value| {
match value {
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// no need to escape
Value::String(s) => Ok(s.to_string()),
// special care for arrays
Value::Array(_) => json_array_to_pg_array(value),
}
})
.collect()
}
//
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
// in the array we need to escape the strings. Postgres is okay with arrays of form
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
// it for Postgres to check.
//
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
//
fn json_array_to_pg_array(value: &Value) -> Result<String, serde_json::Error> {
match value {
// same
Value::Null => serde_json::to_string(value),
Value::Bool(_) => serde_json::to_string(value),
Value::Number(_) => serde_json::to_string(value),
Value::Object(_) => serde_json::to_string(value),
// now needs to be escaped, as it is part of the array
Value::String(_) => serde_json::to_string(value),
// recurse into array
Value::Array(arr) => {
let vals = arr
.iter()
.map(json_array_to_pg_array)
.collect::<Result<Vec<_>, _>>()?
.join(",");
Ok(format!("{{{}}}", vals))
}
}
}
fn get_conn_info(
headers: &HeaderMap,
sni_hostname: Option<String>,
) -> Result<(String, String, String, String), anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
.ok_or(anyhow::anyhow!("missing connection string"))?
.to_str()?;
let connection_url = Url::parse(connection_string)?;
let protocol = connection_url.scheme();
if protocol != "postgres" && protocol != "postgresql" {
return Err(anyhow::anyhow!(
"connection string must start with postgres: or postgresql:"
));
}
let mut url_path = connection_url
.path_segments()
.ok_or(anyhow::anyhow!("missing database name"))?;
let dbname = url_path
.next()
.ok_or(anyhow::anyhow!("invalid database name"))?;
let username = connection_url.username();
if username.is_empty() {
return Err(anyhow::anyhow!("missing username"));
}
let password = connection_url
.password()
.ok_or(anyhow::anyhow!("no password"))?;
// TLS certificate selector now based on SNI hostname, so if we are running here
// we are sure that SNI hostname is set to one of the configured domain names.
let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?;
let hostname = connection_url
.host_str()
.ok_or(anyhow::anyhow!("no host"))?;
let host_header = headers
.get("host")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split(':').next());
if hostname != sni_hostname {
return Err(anyhow::anyhow!("mismatched SNI hostname and hostname"));
} else if let Some(h) = host_header {
if h != hostname {
return Err(anyhow::anyhow!("mismatched host header and hostname"));
}
}
Ok((
username.to_owned(),
dbname.to_owned(),
hostname.to_owned(),
password.to_owned(),
))
}
// TODO: return different http error codes
pub async fn handle(
config: &'static ProxyConfig,
request: Request<Body>,
sni_hostname: Option<String>,
) -> anyhow::Result<Value> {
//
// Determine the destination and connection params
//
let headers = request.headers();
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
let credential_params = StartupMessageParams::new([
("user", &username),
("database", &dbname),
("application_name", APP_NAME),
]);
//
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
//
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let creds = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
.transpose()?;
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => MAX_REQUEST_SIZE + 1,
};
if request_content_length > MAX_REQUEST_SIZE {
return Err(anyhow::anyhow!(
"request is too large (max {MAX_REQUEST_SIZE} bytes)"
));
}
//
// Read the query and query params from the request body
//
let body = hyper::body::to_bytes(request.into_body()).await?;
let QueryData { query, params } = serde_json::from_slice(&body)?;
let query_params = json_to_pg_text(params)?;
//
// Connenct to the destination
//
let (client, connection) = tokio_postgres::Config::new()
.host(host)
.port(port)
.user(&username)
.password(&password)
.dbname(&dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect(tokio_postgres::NoTls)
.await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
//
// Now execute the query and return the result
//
let row_stream = client.query_raw_txt(query, query_params).await?;
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
let mut curret_size = 0;
while let Some(row) = row_stream.next().await {
let row = row?;
curret_size += row.body_len();
rows.push(row);
if curret_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!("response too large"));
}
}
// grab the command tag and number of rows affected
let command_tag = row_stream.command_tag().unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
// INSERT returns OID first and then number of rows
command_tag_split.nth(1)
} else {
// other commands return number of rows (if any)
command_tag_split.next()
}
.and_then(|s| s.parse::<i64>().ok());
let fields = if !rows.is_empty() {
rows[0]
.columns()
.iter()
.map(|c| {
json!({
"name": Value::String(c.name().to_owned()),
"dataTypeID": Value::Number(c.type_().oid().into()),
})
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
// convert rows to JSON
let rows = rows
.iter()
.map(pg_text_row_to_json)
.collect::<Result<Vec<_>, _>>()?;
// resulting JSON format is based on the format of node-postgres result
Ok(json!({
"command": command_tag_name,
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
}))
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub fn pg_text_row_to_json(row: &Row) -> Result<Value, anyhow::Error> {
let res = row
.columns()
.iter()
.enumerate()
.map(|(i, column)| {
let name = column.name();
let pg_value = row.as_text(i)?;
let json_value = pg_text_to_json(pg_value, column.type_())?;
Ok((name.to_string(), json_value))
})
.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
Ok(Value::Object(res))
}
//
// Convert postgres text-encoded value to JSON value
//
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
if let Some(val) = pg_value {
if val == "NULL" {
return Ok(Value::Null);
}
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
}
fn _pg_array_parse(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), anyhow::Error> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
if c == '\\' {
escaped = true;
(i, c) = pg_array_chr.next().unwrap();
}
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' => {
level -= 1;
if level == 0 {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
}
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// push even if empty
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
quote = !quote;
}
',' if !quote => {
if !entry.is_empty() {
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry = String::new();
}
}
_ => {
entry.push(c);
}
}
}
if level != 0 {
return Err(anyhow::anyhow!("unbalanced array"));
}
Ok((Value::Array(entries), 0))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_atomic_types_to_pg_params() {
let json = vec![Value::Bool(true), Value::Bool(false)];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["true", "false"]);
let json = vec![Value::Number(serde_json::Number::from(42))];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["42"]);
let json = vec![Value::String("foo\"".to_string())];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["foo\""]);
let json = vec![Value::Null];
let pg_params = json_to_pg_text(json).unwrap();
assert_eq!(pg_params, vec!["null"]);
}
#[test]
fn test_json_array_to_pg_array() {
// atoms and escaping
let json = "[true, false, null, 42, \"foo\", \"bar\\\"-\\\\\"]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{true,false,null,42,\"foo\",\"bar\\\"-\\\\\"}"]
);
// nested arrays
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec!["{{true,false},{null,42},{\"foo\",\"bar\\\"-\\\\\"}}"]
);
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
json!("Infinity")
);
assert_eq!(
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
json!("-Infinity")
);
let json: Value =
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
.unwrap();
assert_eq!(
pg_text_to_json(
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
&Type::JSONB
)
.unwrap(),
json
);
}
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
json!(["aa\"\\,a", "cha", "bbbb"])
);
assert_eq!(
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
json!([["foo", "bar"], ["bee", "bop"]])
);
assert_eq!(
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
json!([[[["foo", null, "bop", "bup"]]]])
);
assert_eq!(
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
);
}
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
assert_eq!(
pb(r#"{{t,f},{f,t}}"#),
json!([[true, false], [false, true]])
);
assert_eq!(
pb(r#"{{t,NULL},{NULL,f}}"#),
json!([[true, null], [null, false]])
);
}
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty).unwrap()
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
json!([1.1, 2.2, 3.3])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
json!(["NaN", "Infinity", "-Infinity"])
);
assert_eq!(
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
json!(["NaN", "Infinity", "-Infinity"])
);
}
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2).unwrap()
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
}