proxy: encode json as we parse rows (#11992)

Serialize query row responses directly into JSON. Some of this code
should be using the `json::value_as_object/list` macros, but I've
avoided it for now to minimize the size of the diff.
This commit is contained in:
Conrad Ludgate
2025-07-11 20:39:08 +01:00
committed by GitHub
parent 380d167b7c
commit 9bba31bf68
4 changed files with 122 additions and 129 deletions

1
Cargo.lock generated
View File

@@ -5329,6 +5329,7 @@ dependencies = [
"itoa",
"jose-jwa",
"jose-jwk",
"json",
"lasso",
"measured",
"metrics",

View File

@@ -49,6 +49,7 @@ indexmap = { workspace = true, features = ["serde"] }
ipnet.workspace = true
itertools.workspace = true
itoa.workspace = true
json = { path = "../libs/proxy/json" }
lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true

View File

@@ -1,6 +1,7 @@
use json::{ListSer, ObjectSer, ValueSer};
use postgres_client::Row;
use postgres_client::types::{Kind, Type};
use serde_json::{Map, Value};
use serde_json::Value;
//
// Convert json non-string types to strings, so that they can be passed to Postgres
@@ -74,44 +75,40 @@ pub(crate) enum JsonConversionError {
UnbalancedString,
}
enum OutputMode {
Array(Vec<Value>),
Object(Map<String, Value>),
enum OutputMode<'a> {
Array(ListSer<'a>),
Object(ObjectSer<'a>),
}
impl OutputMode {
fn key(&mut self, key: &str) -> &mut Value {
impl OutputMode<'_> {
fn key(&mut self, key: &str) -> ValueSer<'_> {
match self {
OutputMode::Array(values) => push_entry(values, Value::Null),
OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null),
OutputMode::Array(values) => values.entry(),
OutputMode::Object(map) => map.key(key),
}
}
fn finish(self) -> Value {
fn finish(self) {
match self {
OutputMode::Array(values) => Value::Array(values),
OutputMode::Object(map) => Value::Object(map),
OutputMode::Array(values) => values.finish(),
OutputMode::Object(map) => map.finish(),
}
}
}
fn push_entry<T>(arr: &mut Vec<T>, t: T) -> &mut T {
arr.push(t);
arr.last_mut().expect("a value was just inserted")
}
//
// Convert postgres row with text-encoded values to JSON object
//
pub(crate) fn pg_text_row_to_json(
output: ValueSer,
row: &Row,
raw_output: bool,
array_mode: bool,
) -> Result<Value, JsonConversionError> {
) -> Result<(), JsonConversionError> {
let mut entries = if array_mode {
OutputMode::Array(Vec::with_capacity(row.columns().len()))
OutputMode::Array(output.list())
} else {
OutputMode::Object(Map::with_capacity(row.columns().len()))
OutputMode::Object(output.object())
};
for (i, column) in row.columns().iter().enumerate() {
@@ -120,53 +117,48 @@ pub(crate) fn pg_text_row_to_json(
let value = entries.key(column.name());
match pg_value {
Some(v) if raw_output => *value = Value::String(v.to_string()),
Some(v) if raw_output => value.value(v),
Some(v) => pg_text_to_json(value, v, column.type_())?,
None => *value = Value::Null,
None => value.value(json::Null),
}
}
Ok(entries.finish())
entries.finish();
Ok(())
}
//
// Convert postgres text-encoded value to JSON value
//
fn pg_text_to_json(
output: &mut Value,
val: &str,
pg_type: &Type,
) -> Result<(), JsonConversionError> {
fn pg_text_to_json(output: ValueSer, val: &str, pg_type: &Type) -> Result<(), JsonConversionError> {
if let Kind::Array(elem_type) = pg_type.kind() {
// todo: we should fetch this from postgres.
let delimiter = ',';
let mut array = vec![];
pg_array_parse(&mut array, val, elem_type, delimiter)?;
*output = Value::Array(array);
json::value_as_list!(|output| pg_array_parse(output, val, elem_type, delimiter)?);
return Ok(());
}
match *pg_type {
Type::BOOL => *output = Value::Bool(val == "t"),
Type::BOOL => output.value(val == "t"),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
*output = Value::Number(serde_json::Number::from(val));
output.value(val);
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
*output = Value::Number(num);
if fval.is_finite() {
output.value(fval);
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
*output = Value::String(val.to_string());
output.value(val);
}
}
Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?,
_ => *output = Value::String(val.to_string()),
// we assume that the string value is valid json.
Type::JSON | Type::JSONB => output.write_raw_json(val.as_bytes()),
_ => output.value(val),
}
Ok(())
@@ -192,7 +184,7 @@ fn pg_text_to_json(
/// gets its own level of curly braces, and delimiters must be written between adjacent
/// curly-braced entities of the same level.
fn pg_array_parse(
elements: &mut Vec<Value>,
elements: &mut ListSer,
mut pg_array: &str,
elem: &Type,
delim: char,
@@ -221,7 +213,7 @@ fn pg_array_parse(
/// reads a single array from the `pg_array` string and pushes each values to `elements`.
/// returns the rest of the `pg_array` string that was not read.
fn pg_array_parse_inner<'a>(
elements: &mut Vec<Value>,
elements: &mut ListSer,
mut pg_array: &'a str,
elem: &Type,
delim: char,
@@ -234,7 +226,7 @@ fn pg_array_parse_inner<'a>(
let mut q = String::new();
loop {
let value = push_entry(elements, Value::Null);
let value = elements.entry();
pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?;
// check for separator.
@@ -260,7 +252,7 @@ fn pg_array_parse_inner<'a>(
///
/// `quoted` is a scratch allocation that has no defined output.
fn pg_array_parse_item<'a>(
output: &mut Value,
output: ValueSer,
quoted: &mut String,
mut pg_array: &'a str,
elem: &Type,
@@ -276,9 +268,8 @@ fn pg_array_parse_item<'a>(
if pg_array.starts_with('{') {
// nested array.
let mut nested = vec![];
pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?;
*output = Value::Array(nested);
pg_array =
json::value_as_list!(|output| pg_array_parse_inner(output, pg_array, elem, delim))?;
return Ok(pg_array);
}
@@ -306,7 +297,7 @@ fn pg_array_parse_item<'a>(
// we might have an item string:
// check for null
if item == "NULL" {
*output = Value::Null;
output.value(json::Null);
} else {
pg_text_to_json(output, item, elem)?;
}
@@ -440,15 +431,15 @@ mod tests {
}
fn pg_text_to_json(val: &str, pg_type: &Type) -> Value {
let mut v = Value::Null;
super::pg_text_to_json(&mut v, val, pg_type).unwrap();
v
let output = json::value_to_string!(|v| super::pg_text_to_json(v, val, pg_type).unwrap());
serde_json::from_str(&output).unwrap()
}
fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value {
let mut array = vec![];
super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap();
Value::Array(array)
let output = json::value_to_string!(|v| json::value_as_list!(|v| {
super::pg_array_parse(v, pg_array, pg_type, ',').unwrap();
}));
serde_json::from_str(&output).unwrap()
}
#[test]

View File

@@ -14,10 +14,7 @@ use hyper::http::{HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode, header};
use indexmap::IndexMap;
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
};
use serde::Serialize;
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use serde_json::Value;
use serde_json::value::RawValue;
use tokio::time::{self, Instant};
@@ -687,32 +684,21 @@ impl QueryData {
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
match select(
let mut json_buf = vec![];
let batch_result = match select(
pin!(query_to_json(
config,
&mut *inner,
self,
&mut 0,
json::ValueSer::new(&mut json_buf),
parsed_headers
)),
pin!(cancel.cancelled()),
)
.await
{
// The query successfully completed.
Either::Left((Ok((status, results)), __not_yet_cancelled)) => {
discard.check_idle(status);
let json_output =
serde_json::to_string(&results).expect("json serialization should not fail");
Ok(json_output)
}
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
discard.discard();
Err(e)
}
// The query was cancelled.
Either::Left((res, __not_yet_cancelled)) => res,
Either::Right((_cancelled, query)) => {
tracing::info!("cancelling query");
if let Err(err) = cancel_token.cancel_query(NoTls).await {
@@ -721,13 +707,7 @@ impl QueryData {
// wait for the query cancellation
match time::timeout(time::Duration::from_millis(100), query).await {
// query successed before it was cancelled.
Ok(Ok((status, results))) => {
discard.check_idle(status);
let json_output = serde_json::to_string(&results)
.expect("json serialization should not fail");
Ok(json_output)
}
Ok(Ok(status)) => Ok(status),
// query failed or was cancelled.
Ok(Err(error)) => {
let db_error = match &error {
@@ -743,14 +723,29 @@ impl QueryData {
discard.discard();
}
Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres))
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(_timeout) => {
discard.discard();
Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres))
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
}
}
};
match batch_result {
// The query successfully completed.
Ok(status) => {
discard.check_idle(status);
let json_output = String::from_utf8(json_buf).expect("json should be valid utf8");
Ok(json_output)
}
// The query failed with an error
Err(e) => {
discard.discard();
Err(e)
}
}
}
}
@@ -787,7 +782,7 @@ impl BatchQueryData {
})
.map_err(SqlOverHttpError::Postgres)?;
let json_output = match query_batch(
let json_output = match query_batch_to_json(
config,
cancel.child_token(),
&mut transaction,
@@ -845,24 +840,21 @@ async fn query_batch(
transaction: &mut Transaction<'_>,
queries: BatchQueryData,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let mut results = Vec::with_capacity(queries.queries.len());
let mut current_size = 0;
results: &mut json::ListSer<'_>,
) -> Result<(), SqlOverHttpError> {
for stmt in queries.queries {
let query = pin!(query_to_json(
config,
transaction,
stmt,
&mut current_size,
results.entry(),
parsed_headers,
));
let cancelled = pin!(cancel.cancelled());
let res = select(query, cancelled).await;
match res {
// TODO: maybe we should check that the transaction bit is set here
Either::Left((Ok((_, values)), _cancelled)) => {
results.push(values);
}
Either::Left((Ok(_), _cancelled)) => {}
Either::Left((Err(e), _cancelled)) => {
return Err(e);
}
@@ -872,8 +864,22 @@ async fn query_batch(
}
}
let results = json!({ "results": results });
let json_output = serde_json::to_string(&results).expect("json serialization should not fail");
Ok(())
}
async fn query_batch_to_json(
config: &'static HttpConfig,
cancel: CancellationToken,
tx: &mut Transaction<'_>,
queries: BatchQueryData,
headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let json_output = json::value_to_string!(|obj| json::value_as_object!(|obj| {
let results = obj.key("results");
json::value_as_list!(|results| {
query_batch(config, cancel, tx, queries, headers, results).await?;
});
}));
Ok(json_output)
}
@@ -882,54 +888,54 @@ async fn query_to_json<T: GenericClient>(
config: &'static HttpConfig,
client: &mut T,
data: QueryData,
current_size: &mut usize,
output: json::ValueSer<'_>,
parsed_headers: HttpHeaders,
) -> Result<(ReadyForQueryStatus, impl Serialize + use<T>), SqlOverHttpError> {
) -> Result<ReadyForQueryStatus, SqlOverHttpError> {
let query_start = Instant::now();
let query_params = data.params;
let mut output = json::ObjectSer::new(output);
let mut row_stream = client
.query_raw_txt(&data.query, query_params)
.query_raw_txt(&data.query, data.params)
.await
.map_err(SqlOverHttpError::Postgres)?;
let query_acknowledged = Instant::now();
let columns_len = row_stream.statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut json_fields = output.key("fields").list();
for c in row_stream.statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
let json_field = json_fields.entry();
json::value_as_object!(|json_field| {
json_field.entry("name", c.name());
json_field.entry("dataTypeID", c.type_().oid());
json_field.entry("tableID", c.table_oid());
json_field.entry("columnID", c.column_id());
json_field.entry("dataTypeSize", c.type_size());
json_field.entry("dataTypeModifier", c.type_modifier());
json_field.entry("format", "text");
});
}
json_fields.finish();
let raw_output = parsed_headers.raw_output;
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
let raw_output = parsed_headers.raw_output;
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
let mut rows = Vec::new();
let mut rows = 0;
let mut json_rows = output.key("rows").list();
while let Some(row) = row_stream.next().await {
let row = row.map_err(SqlOverHttpError::Postgres)?;
*current_size += row.body_len();
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > config.max_response_size_bytes {
if json_rows.as_buffer().len() > config.max_response_size_bytes {
return Err(SqlOverHttpError::ResponseTooLarge(
config.max_response_size_bytes,
));
}
let row = pg_text_row_to_json(&row, raw_output, array_mode)?;
rows.push(row);
pg_text_row_to_json(json_rows.entry(), &row, raw_output, array_mode)?;
rows += 1;
// assumption: parsing pg text and converting to json takes CPU time.
// let's assume it is slightly expensive, so we should consume some cooperative budget.
@@ -937,16 +943,14 @@ async fn query_to_json<T: GenericClient>(
// of rows and never hit the tokio mpsc for a long time (although unlikely).
tokio::task::consume_budget().await;
}
json_rows.finish();
let query_resp_end = Instant::now();
let RowStream {
command_tag,
status: ready,
..
} = row_stream;
let ready = row_stream.status;
// grab the command tag and number of rows affected
let command_tag = command_tag.unwrap_or_default();
let command_tag = row_stream.command_tag.unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
@@ -959,7 +963,7 @@ async fn query_to_json<T: GenericClient>(
.and_then(|s| s.parse::<i64>().ok());
info!(
rows = rows.len(),
rows,
?ready,
command_tag,
acknowledgement = ?(query_acknowledged - query_start),
@@ -967,16 +971,12 @@ async fn query_to_json<T: GenericClient>(
"finished executing query"
);
// Resulting JSON format is based on the format of node-postgres result.
let results = json!({
"command": command_tag_name.to_string(),
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
"rowAsArray": array_mode,
});
output.entry("command", command_tag_name);
output.entry("rowCount", command_tag_count);
output.entry("rowAsArray", array_mode);
Ok((ready, results))
output.finish();
Ok(ready)
}
enum Client {