From 9bba31bf6805e1c179b75fbb5bcab96c96980c75 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 11 Jul 2025 20:39:08 +0100 Subject: [PATCH] proxy: encode json as we parse rows (#11992) Serialize query row responses directly into JSON. Some of this code should be using the `json::value_as_object/list` macros, but I've avoided it for now to minimize the size of the diff. --- Cargo.lock | 1 + proxy/Cargo.toml | 1 + proxy/src/serverless/json.rs | 95 +++++++--------- proxy/src/serverless/sql_over_http.rs | 154 +++++++++++++------------- 4 files changed, 122 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4323254f0a..14b460005a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5329,6 +5329,7 @@ dependencies = [ "itoa", "jose-jwa", "jose-jwk", + "json", "lasso", "measured", "metrics", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 0a406d1ca8..82fe6818e3 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -49,6 +49,7 @@ indexmap = { workspace = true, features = ["serde"] } ipnet.workspace = true itertools.workspace = true itoa.workspace = true +json = { path = "../libs/proxy/json" } lasso = { workspace = true, features = ["multi-threaded"] } measured = { workspace = true, features = ["lasso"] } metrics.workspace = true diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index 2e67d07079..ef7c8a4d82 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -1,6 +1,7 @@ +use json::{ListSer, ObjectSer, ValueSer}; use postgres_client::Row; use postgres_client::types::{Kind, Type}; -use serde_json::{Map, Value}; +use serde_json::Value; // // Convert json non-string types to strings, so that they can be passed to Postgres @@ -74,44 +75,40 @@ pub(crate) enum JsonConversionError { UnbalancedString, } -enum OutputMode { - Array(Vec), - Object(Map), +enum OutputMode<'a> { + Array(ListSer<'a>), + Object(ObjectSer<'a>), } -impl OutputMode { - fn key(&mut self, key: &str) -> &mut Value { +impl OutputMode<'_> { + fn key(&mut self, key: &str) -> ValueSer<'_> { match self { - OutputMode::Array(values) => push_entry(values, Value::Null), - OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null), + OutputMode::Array(values) => values.entry(), + OutputMode::Object(map) => map.key(key), } } - fn finish(self) -> Value { + fn finish(self) { match self { - OutputMode::Array(values) => Value::Array(values), - OutputMode::Object(map) => Value::Object(map), + OutputMode::Array(values) => values.finish(), + OutputMode::Object(map) => map.finish(), } } } -fn push_entry(arr: &mut Vec, t: T) -> &mut T { - arr.push(t); - arr.last_mut().expect("a value was just inserted") -} - // // Convert postgres row with text-encoded values to JSON object // pub(crate) fn pg_text_row_to_json( + output: ValueSer, row: &Row, raw_output: bool, array_mode: bool, -) -> Result { +) -> Result<(), JsonConversionError> { let mut entries = if array_mode { - OutputMode::Array(Vec::with_capacity(row.columns().len())) + OutputMode::Array(output.list()) } else { - OutputMode::Object(Map::with_capacity(row.columns().len())) + OutputMode::Object(output.object()) }; for (i, column) in row.columns().iter().enumerate() { @@ -120,53 +117,48 @@ pub(crate) fn pg_text_row_to_json( let value = entries.key(column.name()); match pg_value { - Some(v) if raw_output => *value = Value::String(v.to_string()), + Some(v) if raw_output => value.value(v), Some(v) => pg_text_to_json(value, v, column.type_())?, - None => *value = Value::Null, + None => value.value(json::Null), } } - Ok(entries.finish()) + entries.finish(); + Ok(()) } // // Convert postgres text-encoded value to JSON value // -fn pg_text_to_json( - output: &mut Value, - val: &str, - pg_type: &Type, -) -> Result<(), JsonConversionError> { +fn pg_text_to_json(output: ValueSer, val: &str, pg_type: &Type) -> Result<(), JsonConversionError> { if let Kind::Array(elem_type) = pg_type.kind() { // todo: we should fetch this from postgres. let delimiter = ','; - let mut array = vec![]; - pg_array_parse(&mut array, val, elem_type, delimiter)?; - *output = Value::Array(array); + json::value_as_list!(|output| pg_array_parse(output, val, elem_type, delimiter)?); return Ok(()); } match *pg_type { - Type::BOOL => *output = Value::Bool(val == "t"), + Type::BOOL => output.value(val == "t"), Type::INT2 | Type::INT4 => { let val = val.parse::()?; - *output = Value::Number(serde_json::Number::from(val)); + output.value(val); } Type::FLOAT4 | Type::FLOAT8 => { let fval = val.parse::()?; - let num = serde_json::Number::from_f64(fval); - if let Some(num) = num { - *output = Value::Number(num); + if fval.is_finite() { + output.value(fval); } else { // Pass Nan, Inf, -Inf as strings // JS JSON.stringify() does converts them to null, but we // want to preserve them, so we pass them as strings - *output = Value::String(val.to_string()); + output.value(val); } } - Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?, - _ => *output = Value::String(val.to_string()), + // we assume that the string value is valid json. + Type::JSON | Type::JSONB => output.write_raw_json(val.as_bytes()), + _ => output.value(val), } Ok(()) @@ -192,7 +184,7 @@ fn pg_text_to_json( /// gets its own level of curly braces, and delimiters must be written between adjacent /// curly-braced entities of the same level. fn pg_array_parse( - elements: &mut Vec, + elements: &mut ListSer, mut pg_array: &str, elem: &Type, delim: char, @@ -221,7 +213,7 @@ fn pg_array_parse( /// reads a single array from the `pg_array` string and pushes each values to `elements`. /// returns the rest of the `pg_array` string that was not read. fn pg_array_parse_inner<'a>( - elements: &mut Vec, + elements: &mut ListSer, mut pg_array: &'a str, elem: &Type, delim: char, @@ -234,7 +226,7 @@ fn pg_array_parse_inner<'a>( let mut q = String::new(); loop { - let value = push_entry(elements, Value::Null); + let value = elements.entry(); pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?; // check for separator. @@ -260,7 +252,7 @@ fn pg_array_parse_inner<'a>( /// /// `quoted` is a scratch allocation that has no defined output. fn pg_array_parse_item<'a>( - output: &mut Value, + output: ValueSer, quoted: &mut String, mut pg_array: &'a str, elem: &Type, @@ -276,9 +268,8 @@ fn pg_array_parse_item<'a>( if pg_array.starts_with('{') { // nested array. - let mut nested = vec![]; - pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?; - *output = Value::Array(nested); + pg_array = + json::value_as_list!(|output| pg_array_parse_inner(output, pg_array, elem, delim))?; return Ok(pg_array); } @@ -306,7 +297,7 @@ fn pg_array_parse_item<'a>( // we might have an item string: // check for null if item == "NULL" { - *output = Value::Null; + output.value(json::Null); } else { pg_text_to_json(output, item, elem)?; } @@ -440,15 +431,15 @@ mod tests { } fn pg_text_to_json(val: &str, pg_type: &Type) -> Value { - let mut v = Value::Null; - super::pg_text_to_json(&mut v, val, pg_type).unwrap(); - v + let output = json::value_to_string!(|v| super::pg_text_to_json(v, val, pg_type).unwrap()); + serde_json::from_str(&output).unwrap() } fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value { - let mut array = vec![]; - super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap(); - Value::Array(array) + let output = json::value_to_string!(|v| json::value_as_list!(|v| { + super::pg_array_parse(v, pg_array, pg_type, ',').unwrap(); + })); + serde_json::from_str(&output).unwrap() } #[test] diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 7a718d0280..8a14f804b6 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -14,10 +14,7 @@ use hyper::http::{HeaderName, HeaderValue}; use hyper::{Request, Response, StatusCode, header}; use indexmap::IndexMap; use postgres_client::error::{DbError, ErrorPosition, SqlState}; -use postgres_client::{ - GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction, -}; -use serde::Serialize; +use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use serde_json::Value; use serde_json::value::RawValue; use tokio::time::{self, Instant}; @@ -687,32 +684,21 @@ impl QueryData { let (inner, mut discard) = client.inner(); let cancel_token = inner.cancel_token(); - match select( + let mut json_buf = vec![]; + + let batch_result = match select( pin!(query_to_json( config, &mut *inner, self, - &mut 0, + json::ValueSer::new(&mut json_buf), parsed_headers )), pin!(cancel.cancelled()), ) .await { - // The query successfully completed. - Either::Left((Ok((status, results)), __not_yet_cancelled)) => { - discard.check_idle(status); - - let json_output = - serde_json::to_string(&results).expect("json serialization should not fail"); - Ok(json_output) - } - // The query failed with an error - Either::Left((Err(e), __not_yet_cancelled)) => { - discard.discard(); - Err(e) - } - // The query was cancelled. + Either::Left((res, __not_yet_cancelled)) => res, Either::Right((_cancelled, query)) => { tracing::info!("cancelling query"); if let Err(err) = cancel_token.cancel_query(NoTls).await { @@ -721,13 +707,7 @@ impl QueryData { // wait for the query cancellation match time::timeout(time::Duration::from_millis(100), query).await { // query successed before it was cancelled. - Ok(Ok((status, results))) => { - discard.check_idle(status); - - let json_output = serde_json::to_string(&results) - .expect("json serialization should not fail"); - Ok(json_output) - } + Ok(Ok(status)) => Ok(status), // query failed or was cancelled. Ok(Err(error)) => { let db_error = match &error { @@ -743,14 +723,29 @@ impl QueryData { discard.discard(); } - Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)) + return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)); } Err(_timeout) => { discard.discard(); - Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)) + return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)); } } } + }; + + match batch_result { + // The query successfully completed. + Ok(status) => { + discard.check_idle(status); + + let json_output = String::from_utf8(json_buf).expect("json should be valid utf8"); + Ok(json_output) + } + // The query failed with an error + Err(e) => { + discard.discard(); + Err(e) + } } } } @@ -787,7 +782,7 @@ impl BatchQueryData { }) .map_err(SqlOverHttpError::Postgres)?; - let json_output = match query_batch( + let json_output = match query_batch_to_json( config, cancel.child_token(), &mut transaction, @@ -845,24 +840,21 @@ async fn query_batch( transaction: &mut Transaction<'_>, queries: BatchQueryData, parsed_headers: HttpHeaders, -) -> Result { - let mut results = Vec::with_capacity(queries.queries.len()); - let mut current_size = 0; + results: &mut json::ListSer<'_>, +) -> Result<(), SqlOverHttpError> { for stmt in queries.queries { let query = pin!(query_to_json( config, transaction, stmt, - &mut current_size, + results.entry(), parsed_headers, )); let cancelled = pin!(cancel.cancelled()); let res = select(query, cancelled).await; match res { // TODO: maybe we should check that the transaction bit is set here - Either::Left((Ok((_, values)), _cancelled)) => { - results.push(values); - } + Either::Left((Ok(_), _cancelled)) => {} Either::Left((Err(e), _cancelled)) => { return Err(e); } @@ -872,8 +864,22 @@ async fn query_batch( } } - let results = json!({ "results": results }); - let json_output = serde_json::to_string(&results).expect("json serialization should not fail"); + Ok(()) +} + +async fn query_batch_to_json( + config: &'static HttpConfig, + cancel: CancellationToken, + tx: &mut Transaction<'_>, + queries: BatchQueryData, + headers: HttpHeaders, +) -> Result { + let json_output = json::value_to_string!(|obj| json::value_as_object!(|obj| { + let results = obj.key("results"); + json::value_as_list!(|results| { + query_batch(config, cancel, tx, queries, headers, results).await?; + }); + })); Ok(json_output) } @@ -882,54 +888,54 @@ async fn query_to_json( config: &'static HttpConfig, client: &mut T, data: QueryData, - current_size: &mut usize, + output: json::ValueSer<'_>, parsed_headers: HttpHeaders, -) -> Result<(ReadyForQueryStatus, impl Serialize + use), SqlOverHttpError> { +) -> Result { let query_start = Instant::now(); - let query_params = data.params; + let mut output = json::ObjectSer::new(output); let mut row_stream = client - .query_raw_txt(&data.query, query_params) + .query_raw_txt(&data.query, data.params) .await .map_err(SqlOverHttpError::Postgres)?; let query_acknowledged = Instant::now(); - let columns_len = row_stream.statement.columns().len(); - let mut fields = Vec::with_capacity(columns_len); - + let mut json_fields = output.key("fields").list(); for c in row_stream.statement.columns() { - fields.push(json!({ - "name": c.name().to_owned(), - "dataTypeID": c.type_().oid(), - "tableID": c.table_oid(), - "columnID": c.column_id(), - "dataTypeSize": c.type_size(), - "dataTypeModifier": c.type_modifier(), - "format": "text", - })); + let json_field = json_fields.entry(); + json::value_as_object!(|json_field| { + json_field.entry("name", c.name()); + json_field.entry("dataTypeID", c.type_().oid()); + json_field.entry("tableID", c.table_oid()); + json_field.entry("columnID", c.column_id()); + json_field.entry("dataTypeSize", c.type_size()); + json_field.entry("dataTypeModifier", c.type_modifier()); + json_field.entry("format", "text"); + }); } + json_fields.finish(); - let raw_output = parsed_headers.raw_output; let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode); + let raw_output = parsed_headers.raw_output; // Manually drain the stream into a vector to leave row_stream hanging // around to get a command tag. Also check that the response is not too // big. - let mut rows = Vec::new(); + let mut rows = 0; + let mut json_rows = output.key("rows").list(); while let Some(row) = row_stream.next().await { let row = row.map_err(SqlOverHttpError::Postgres)?; - *current_size += row.body_len(); // we don't have a streaming response support yet so this is to prevent OOM // from a malicious query (eg a cross join) - if *current_size > config.max_response_size_bytes { + if json_rows.as_buffer().len() > config.max_response_size_bytes { return Err(SqlOverHttpError::ResponseTooLarge( config.max_response_size_bytes, )); } - let row = pg_text_row_to_json(&row, raw_output, array_mode)?; - rows.push(row); + pg_text_row_to_json(json_rows.entry(), &row, raw_output, array_mode)?; + rows += 1; // assumption: parsing pg text and converting to json takes CPU time. // let's assume it is slightly expensive, so we should consume some cooperative budget. @@ -937,16 +943,14 @@ async fn query_to_json( // of rows and never hit the tokio mpsc for a long time (although unlikely). tokio::task::consume_budget().await; } + json_rows.finish(); let query_resp_end = Instant::now(); - let RowStream { - command_tag, - status: ready, - .. - } = row_stream; + + let ready = row_stream.status; // grab the command tag and number of rows affected - let command_tag = command_tag.unwrap_or_default(); + let command_tag = row_stream.command_tag.unwrap_or_default(); let mut command_tag_split = command_tag.split(' '); let command_tag_name = command_tag_split.next().unwrap_or_default(); let command_tag_count = if command_tag_name == "INSERT" { @@ -959,7 +963,7 @@ async fn query_to_json( .and_then(|s| s.parse::().ok()); info!( - rows = rows.len(), + rows, ?ready, command_tag, acknowledgement = ?(query_acknowledged - query_start), @@ -967,16 +971,12 @@ async fn query_to_json( "finished executing query" ); - // Resulting JSON format is based on the format of node-postgres result. - let results = json!({ - "command": command_tag_name.to_string(), - "rowCount": command_tag_count, - "rows": rows, - "fields": fields, - "rowAsArray": array_mode, - }); + output.entry("command", command_tag_name); + output.entry("rowCount", command_tag_count); + output.entry("rowAsArray", array_mode); - Ok((ready, results)) + output.finish(); + Ok(ready) } enum Client {