From d6beb3ffbb8631931c8bae922bec955088354ee5 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 2 Jul 2025 13:46:11 +0100 Subject: [PATCH] [proxy] rewrite pg-text to json routines (#12413) We would like to move towards an arena system for JSON encoding the responses. This change pushes an "out" parameter into the pg-test to json routines to make swapping in an arena system easier in the future. (see #11992) This additionally removes the redundant `column: &[Type]` argument, as well as rewriting the pg_array parser. --- I rewrote the pg_array parser since while making these changes I found it hard to reason about. I went back to the specification and rewrote it from scratch. There's 4 separate routines: 1. pg_array_parse - checks for any prelude (multidimensional array ranges) 2. pg_array_parse_inner - only deals with the arrays themselves 3. pg_array_parse_item - parses a single item from the array, this might be quoted, unquoted, or another nested array. 4. pg_array_parse_quoted - parses a quoted string, following the relevant string escaping rules. --- proxy/src/serverless/json.rs | 488 ++++++++++++++++---------- proxy/src/serverless/sql_over_http.rs | 5 +- 2 files changed, 304 insertions(+), 189 deletions(-) diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index 1afc10359f..2e67d07079 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -70,6 +70,34 @@ pub(crate) enum JsonConversionError { ParseJsonError(#[from] serde_json::Error), #[error("unbalanced array")] UnbalancedArray, + #[error("unbalanced quoted string")] + UnbalancedString, +} + +enum OutputMode { + Array(Vec), + Object(Map), +} + +impl OutputMode { + fn key(&mut self, key: &str) -> &mut Value { + match self { + OutputMode::Array(values) => push_entry(values, Value::Null), + OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null), + } + } + + fn finish(self) -> Value { + match self { + OutputMode::Array(values) => Value::Array(values), + OutputMode::Object(map) => Value::Object(map), + } + } +} + +fn push_entry(arr: &mut Vec, t: T) -> &mut T { + arr.push(t); + arr.last_mut().expect("a value was just inserted") } // @@ -77,182 +105,277 @@ pub(crate) enum JsonConversionError { // pub(crate) fn pg_text_row_to_json( row: &Row, - columns: &[Type], raw_output: bool, array_mode: bool, ) -> Result { - let iter = row - .columns() - .iter() - .zip(columns) - .enumerate() - .map(|(i, (column, typ))| { - let name = column.name(); - let pg_value = row.as_text(i).map_err(JsonConversionError::AsTextError)?; - let json_value = if raw_output { - match pg_value { - Some(v) => Value::String(v.to_string()), - None => Value::Null, - } - } else { - pg_text_to_json(pg_value, typ)? - }; - Ok((name.to_string(), json_value)) - }); - - if array_mode { - // drop keys and aggregate into array - let arr = iter - .map(|r| r.map(|(_key, val)| val)) - .collect::, JsonConversionError>>()?; - Ok(Value::Array(arr)) + let mut entries = if array_mode { + OutputMode::Array(Vec::with_capacity(row.columns().len())) } else { - let obj = iter.collect::, JsonConversionError>>()?; - Ok(Value::Object(obj)) + OutputMode::Object(Map::with_capacity(row.columns().len())) + }; + + for (i, column) in row.columns().iter().enumerate() { + let pg_value = row.as_text(i).map_err(JsonConversionError::AsTextError)?; + + let value = entries.key(column.name()); + + match pg_value { + Some(v) if raw_output => *value = Value::String(v.to_string()), + Some(v) => pg_text_to_json(value, v, column.type_())?, + None => *value = Value::Null, + } } + + Ok(entries.finish()) } // // Convert postgres text-encoded value to JSON value // -fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result { - if let Some(val) = pg_value { - if let Kind::Array(elem_type) = pg_type.kind() { - return pg_array_parse(val, elem_type); - } +fn pg_text_to_json( + output: &mut Value, + val: &str, + pg_type: &Type, +) -> Result<(), JsonConversionError> { + if let Kind::Array(elem_type) = pg_type.kind() { + // todo: we should fetch this from postgres. + let delimiter = ','; - match *pg_type { - Type::BOOL => Ok(Value::Bool(val == "t")), - Type::INT2 | Type::INT4 => { - let val = val.parse::()?; - Ok(Value::Number(serde_json::Number::from(val))) - } - Type::FLOAT4 | Type::FLOAT8 => { - let fval = val.parse::()?; - let num = serde_json::Number::from_f64(fval); - if let Some(num) = num { - Ok(Value::Number(num)) - } else { - // Pass Nan, Inf, -Inf as strings - // JS JSON.stringify() does converts them to null, but we - // want to preserve them, so we pass them as strings - Ok(Value::String(val.to_string())) - } - } - Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?), - _ => Ok(Value::String(val.to_string())), - } - } else { - Ok(Value::Null) - } -} - -// -// Parse postgres array into JSON array. -// -// This is a bit involved because we need to handle nested arrays and quoted -// values. Unlike postgres we don't check that all nested arrays have the same -// dimensions, we just return them as is. -// -fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result { - pg_array_parse_inner(pg_array, elem_type, false).map(|(v, _)| v) -} - -fn pg_array_parse_inner( - pg_array: &str, - elem_type: &Type, - nested: bool, -) -> Result<(Value, usize), JsonConversionError> { - let mut pg_array_chr = pg_array.char_indices(); - let mut level = 0; - let mut quote = false; - let mut entries: Vec = Vec::new(); - let mut entry = String::new(); - - // skip bounds decoration - if let Some('[') = pg_array.chars().next() { - for (_, c) in pg_array_chr.by_ref() { - if c == '=' { - break; - } - } + let mut array = vec![]; + pg_array_parse(&mut array, val, elem_type, delimiter)?; + *output = Value::Array(array); + return Ok(()); } - fn push_checked( - entry: &mut String, - entries: &mut Vec, - elem_type: &Type, - ) -> Result<(), JsonConversionError> { - if !entry.is_empty() { - // While in usual postgres response we get nulls as None and everything else - // as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while - // string with value 'NULL' will be represented by '"NULL"'). So catch NULLs - // here while we have quotation info and convert them to None. - if entry == "NULL" { - entries.push(pg_text_to_json(None, elem_type)?); + match *pg_type { + Type::BOOL => *output = Value::Bool(val == "t"), + Type::INT2 | Type::INT4 => { + let val = val.parse::()?; + *output = Value::Number(serde_json::Number::from(val)); + } + Type::FLOAT4 | Type::FLOAT8 => { + let fval = val.parse::()?; + let num = serde_json::Number::from_f64(fval); + if let Some(num) = num { + *output = Value::Number(num); } else { - entries.push(pg_text_to_json(Some(entry), elem_type)?); + // Pass Nan, Inf, -Inf as strings + // JS JSON.stringify() does converts them to null, but we + // want to preserve them, so we pass them as strings + *output = Value::String(val.to_string()); } - entry.clear(); } - - Ok(()) + Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?, + _ => *output = Value::String(val.to_string()), } - while let Some((mut i, mut c)) = pg_array_chr.next() { - let mut escaped = false; + Ok(()) +} - if c == '\\' { - escaped = true; - let Some(x) = pg_array_chr.next() else { - return Err(JsonConversionError::UnbalancedArray); - }; - (i, c) = x; - } - - match c { - '{' if !quote => { - level += 1; - if level > 1 { - let (res, off) = pg_array_parse_inner(&pg_array[i..], elem_type, true)?; - entries.push(res); - for _ in 0..off - 1 { - pg_array_chr.next(); - } - } - } - '}' if !quote => { - level -= 1; - if level == 0 { - push_checked(&mut entry, &mut entries, elem_type)?; - if nested { - return Ok((Value::Array(entries), i)); - } - } - } - '"' if !escaped => { - if quote { - // end of quoted string, so push it manually without any checks - // for emptiness or nulls - entries.push(pg_text_to_json(Some(&entry), elem_type)?); - entry.clear(); - } - quote = !quote; - } - ',' if !quote => { - push_checked(&mut entry, &mut entries, elem_type)?; - } - _ => { - entry.push(c); - } - } +/// Parse postgres array into JSON array. +/// +/// This is a bit involved because we need to handle nested arrays and quoted +/// values. Unlike postgres we don't check that all nested arrays have the same +/// dimensions, we just return them as is. +/// +/// +/// +/// The external text representation of an array value consists of items that are interpreted +/// according to the I/O conversion rules for the array's element type, plus decoration that +/// indicates the array structure. The decoration consists of curly braces (`{` and `}`) around +/// the array value plus delimiter characters between adjacent items. The delimiter character +/// is usually a comma (,) but can be something else: it is determined by the typdelim setting +/// for the array's element type. Among the standard data types provided in the PostgreSQL +/// distribution, all use a comma, except for type box, which uses a semicolon (;). +/// +/// In a multidimensional array, each dimension (row, plane, cube, etc.) +/// gets its own level of curly braces, and delimiters must be written between adjacent +/// curly-braced entities of the same level. +fn pg_array_parse( + elements: &mut Vec, + mut pg_array: &str, + elem: &Type, + delim: char, +) -> Result<(), JsonConversionError> { + // skip bounds decoration, eg: + // `[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}` + // technically these are significant, but we have no way to represent them in json. + if let Some('[') = pg_array.chars().next() { + let Some((_bounds, array)) = pg_array.split_once('=') else { + return Err(JsonConversionError::UnbalancedArray); + }; + pg_array = array; } - if level != 0 { + // whitespace might preceed a `{`. + let pg_array = pg_array.trim_start(); + + let rest = pg_array_parse_inner(elements, pg_array, elem, delim)?; + if !rest.is_empty() { return Err(JsonConversionError::UnbalancedArray); } - Ok((Value::Array(entries), 0)) + Ok(()) +} + +/// reads a single array from the `pg_array` string and pushes each values to `elements`. +/// returns the rest of the `pg_array` string that was not read. +fn pg_array_parse_inner<'a>( + elements: &mut Vec, + mut pg_array: &'a str, + elem: &Type, + delim: char, +) -> Result<&'a str, JsonConversionError> { + // array should have a `{` prefix. + pg_array = pg_array + .strip_prefix('{') + .ok_or(JsonConversionError::UnbalancedArray)?; + + let mut q = String::new(); + + loop { + let value = push_entry(elements, Value::Null); + pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?; + + // check for separator. + if let Some(next) = pg_array.strip_prefix(delim) { + // next item. + pg_array = next; + } else { + break; + } + } + + let Some(next) = pg_array.strip_prefix('}') else { + // missing `}` terminator. + return Err(JsonConversionError::UnbalancedArray); + }; + + // whitespace might follow a `}`. + Ok(next.trim_start()) +} + +/// reads a single item from the `pg_array` string. +/// returns the rest of the `pg_array` string that was not read. +/// +/// `quoted` is a scratch allocation that has no defined output. +fn pg_array_parse_item<'a>( + output: &mut Value, + quoted: &mut String, + mut pg_array: &'a str, + elem: &Type, + delim: char, +) -> Result<&'a str, JsonConversionError> { + // We are trying to parse an array item. + // This could be a new array, if this is a multi-dimentional array. + // This could be a quoted string representing `elem`. + // This could be an unquoted string representing `elem`. + + // whitespace might preceed an item. + pg_array = pg_array.trim_start(); + + if pg_array.starts_with('{') { + // nested array. + let mut nested = vec![]; + pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?; + *output = Value::Array(nested); + return Ok(pg_array); + } + + if let Some(mut pg_array) = pg_array.strip_prefix('"') { + // the parsed string is un-escaped and written into quoted. + pg_array = pg_array_parse_quoted(quoted, pg_array)?; + + // we have un-escaped the string, parse it as pgtext. + pg_text_to_json(output, quoted, elem)?; + + return Ok(pg_array); + } + + // we need to parse an item. read until we find a delimiter or `}`. + let index = pg_array + .find([delim, '}']) + .ok_or(JsonConversionError::UnbalancedArray)?; + + let item; + (item, pg_array) = pg_array.split_at(index); + + // item might have trailing whitespace that we need to ignore. + let item = item.trim_end(); + + // we might have an item string: + // check for null + if item == "NULL" { + *output = Value::Null; + } else { + pg_text_to_json(output, item, elem)?; + } + + Ok(pg_array) +} + +/// reads a single quoted item from the `pg_array` string. +/// +/// Returns the rest of the `pg_array` string that was not read. +/// The output is written into `quoted`. +/// +/// The pg_array string must have a `"` terminator, but the `"` initial value +/// must have already been removed from the input. The terminator is removed. +fn pg_array_parse_quoted<'a>( + quoted: &mut String, + mut pg_array: &'a str, +) -> Result<&'a str, JsonConversionError> { + // The array output routine will put double quotes around element values if they are empty strings, + // contain curly braces, delimiter characters, double quotes, backslashes, or white space, + // or match the word `NULL`. Double quotes and backslashes embedded in element values will be backslash-escaped. + // For numeric data types it is safe to assume that double quotes will never appear, + // but for textual data types one should be prepared to cope with either the presence or absence of quotes. + + quoted.clear(); + + // We write to quoted in chunks terminated by an escape character. + // Eg if we have the input `foo\"bar"`, then we write `foo`, then `"`, then finally `bar`. + + loop { + // we need to parse an chunk. read until we find a '\\' or `"`. + let i = pg_array + .find(['\\', '"']) + .ok_or(JsonConversionError::UnbalancedString)?; + + let chunk: &str; + (chunk, pg_array) = pg_array + .split_at_checked(i) + .expect("i is guaranteed to be in-bounds of pg_array"); + + // push the chunk. + quoted.push_str(chunk); + + // consume the chunk_end character. + let chunk_end: char; + (chunk_end, pg_array) = + split_first_char(pg_array).expect("pg_array should start with either '\\\\' or '\"'"); + + // finished. + if chunk_end == '"' { + // whitespace might follow the '"'. + pg_array = pg_array.trim_start(); + + break Ok(pg_array); + } + + // consume the escaped character. + let escaped: char; + (escaped, pg_array) = + split_first_char(pg_array).ok_or(JsonConversionError::UnbalancedString)?; + + quoted.push(escaped); + } +} + +fn split_first_char(s: &str) -> Option<(char, &str)> { + let mut chars = s.chars(); + let c = chars.next()?; + Some((c, chars.as_str())) } #[cfg(test)] @@ -316,37 +439,33 @@ mod tests { ); } + fn pg_text_to_json(val: &str, pg_type: &Type) -> Value { + let mut v = Value::Null; + super::pg_text_to_json(&mut v, val, pg_type).unwrap(); + v + } + + fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value { + let mut array = vec![]; + super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap(); + Value::Array(array) + } + #[test] fn test_atomic_types_parse() { + assert_eq!(pg_text_to_json("foo", &Type::TEXT), json!("foo")); + assert_eq!(pg_text_to_json("42", &Type::INT4), json!(42)); + assert_eq!(pg_text_to_json("42", &Type::INT2), json!(42)); + assert_eq!(pg_text_to_json("42", &Type::INT8), json!("42")); + assert_eq!(pg_text_to_json("42.42", &Type::FLOAT8), json!(42.42)); + assert_eq!(pg_text_to_json("42.42", &Type::FLOAT4), json!(42.42)); + assert_eq!(pg_text_to_json("NaN", &Type::FLOAT4), json!("NaN")); assert_eq!( - pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(), - json!("foo") - ); - assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null)); - assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42)); - assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42)); - assert_eq!( - pg_text_to_json(Some("42"), &Type::INT8).unwrap(), - json!("42") - ); - assert_eq!( - pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(), - json!(42.42) - ); - assert_eq!( - pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(), - json!(42.42) - ); - assert_eq!( - pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(), - json!("NaN") - ); - assert_eq!( - pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(), + pg_text_to_json("Infinity", &Type::FLOAT4), json!("Infinity") ); assert_eq!( - pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(), + pg_text_to_json("-Infinity", &Type::FLOAT4), json!("-Infinity") ); @@ -355,10 +474,9 @@ mod tests { .unwrap(); assert_eq!( pg_text_to_json( - Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#), + r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#, &Type::JSONB - ) - .unwrap(), + ), json ); } @@ -366,7 +484,7 @@ mod tests { #[test] fn test_pg_array_parse_text() { fn pt(pg_arr: &str) -> Value { - pg_array_parse(pg_arr, &Type::TEXT).unwrap() + pg_array_parse(pg_arr, &Type::TEXT) } assert_eq!( pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#), @@ -389,7 +507,7 @@ mod tests { #[test] fn test_pg_array_parse_bool() { fn pb(pg_arr: &str) -> Value { - pg_array_parse(pg_arr, &Type::BOOL).unwrap() + pg_array_parse(pg_arr, &Type::BOOL) } assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true])); assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]])); @@ -406,7 +524,7 @@ mod tests { #[test] fn test_pg_array_parse_numbers() { fn pn(pg_arr: &str, ty: &Type) -> Value { - pg_array_parse(pg_arr, ty).unwrap() + pg_array_parse(pg_arr, ty) } assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3])); assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3])); @@ -434,7 +552,7 @@ mod tests { #[test] fn test_pg_array_with_decoration() { fn p(pg_arr: &str) -> Value { - pg_array_parse(pg_arr, &Type::INT2).unwrap() + pg_array_parse(pg_arr, &Type::INT2) } assert_eq!( p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#), @@ -445,7 +563,7 @@ mod tests { #[test] fn test_pg_array_parse_json() { fn pt(pg_arr: &str) -> Value { - pg_array_parse(pg_arr, &Type::JSONB).unwrap() + pg_array_parse(pg_arr, &Type::JSONB) } assert_eq!(pt(r#"{"{}"}"#), json!([{}])); assert_eq!( diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 5d5e7bf83e..18ce03c725 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -1135,7 +1135,6 @@ async fn query_to_json( let columns_len = row_stream.statement.columns().len(); let mut fields = Vec::with_capacity(columns_len); - let mut types = Vec::with_capacity(columns_len); for c in row_stream.statement.columns() { fields.push(json!({ @@ -1147,8 +1146,6 @@ async fn query_to_json( "dataTypeModifier": c.type_modifier(), "format": "text", })); - - types.push(c.type_().clone()); } let raw_output = parsed_headers.raw_output; @@ -1170,7 +1167,7 @@ async fn query_to_json( )); } - let row = pg_text_row_to_json(&row, &types, raw_output, array_mode)?; + let row = pg_text_row_to_json(&row, raw_output, array_mode)?; rows.push(row); // assumption: parsing pg text and converting to json takes CPU time.