Compare commits

..

5 Commits

Author SHA1 Message Date
Conrad Ludgate
4345fdf07b completely rewrite array text parsing based on spec 2025-05-26 12:35:39 +01:00
Conrad Ludgate
19461604ae move array initialisation out of pg_array_parse 2025-05-26 11:01:57 +01:00
Conrad Ludgate
ef9a5785b0 move value write inside pg_text_to_json 2025-05-26 10:56:28 +01:00
Conrad Ludgate
b339daed9b manage vec/map manually, and handle nulls separately 2025-05-26 10:51:22 +01:00
Conrad Ludgate
85233a85a6 no longer contruct column types vec 2025-05-26 10:35:46 +01:00
6 changed files with 345 additions and 269 deletions

View File

@@ -10,7 +10,6 @@ pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
pub const DEFAULT_GRPC_LISTEN_TLS: bool = false; // TODO: enable by default?
use std::collections::HashMap;
use std::num::{NonZeroU64, NonZeroUsize};
@@ -108,7 +107,6 @@ pub struct ConfigToml {
pub listen_http_addr: String,
pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_tls: bool,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
#[serde(with = "humantime_serde")]
@@ -595,7 +593,6 @@ impl Default for ConfigToml {
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
listen_https_addr: (None),
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
listen_grpc_tls: DEFAULT_GRPC_LISTEN_TLS,
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),

View File

@@ -483,9 +483,7 @@ fn start_pageserver(
grpc_auth = None;
}
let tls_server_config = if conf.listen_https_addr.is_some()
|| conf.enable_tls_page_service_api
|| conf.listen_grpc_tls
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
{
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
"main",
@@ -793,9 +791,11 @@ fn start_pageserver(
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
conf.enable_tls_page_service_api
.then(|| tls_server_config.clone())
.flatten(),
if conf.enable_tls_page_service_api {
tls_server_config
} else {
None
},
basebackup_cache.clone(),
);
@@ -813,9 +813,6 @@ fn start_pageserver(
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
grpc_listener,
conf.listen_grpc_tls
.then(|| tls_server_config.clone())
.flatten(),
basebackup_cache,
)?);
}

View File

@@ -63,8 +63,6 @@ pub struct PageServerConf {
///
/// EXPERIMENTAL: this protocol is unstable and under active development.
pub listen_grpc_addr: Option<String>,
/// If true, enable TLS for the gRPC server, using ssl_key_file and ssl_cert_file.
pub listen_grpc_tls: bool,
/// Path to a file with certificate's private key for https and gRPC API.
/// Default: server.key
@@ -230,7 +228,7 @@ pub struct PageServerConf {
pub tracing: Option<pageserver_api::config::Tracing>,
/// Enable TLS in the libpq page service API.
/// Enable TLS in page service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
@@ -365,7 +363,6 @@ impl PageServerConf {
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
listen_grpc_tls,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
@@ -436,7 +433,6 @@ impl PageServerConf {
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
listen_grpc_tls,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,

View File

@@ -13,10 +13,9 @@ use std::{io, str};
use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::{FutureExt, Stream, StreamExt as _};
use futures::{FutureExt, Stream};
use itertools::Itertools;
use jsonwebtoken::TokenData;
use nix::sys::socket::{setsockopt, sockopt};
use once_cell::sync::OnceCell;
use pageserver_api::config::{
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
@@ -43,8 +42,6 @@ use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_rustls::TlsAcceptor;
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::{Claims, Scope, SwappableJwtAuth};
@@ -72,7 +69,7 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::task_mgr::{COMPUTE_REQUEST_RUNTIME, TaskKind, exit_on_panic_or_error};
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
use crate::tenant::mgr::{
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
};
@@ -90,9 +87,10 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
/// Whether to enable TCP keepalives for gRPC connections. The interval and
/// timeouts are configured via sysctl. This detects dead connections sooner.
const GRPC_TCP_KEEPALIVE: bool = true;
/// The idle time before sending TCP keepalive probes for gRPC connections. The
/// interval and timeout between each probe is configured via sysctl. This
/// allows detecting dead connections sooner.
const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
/// algorithm, which can cause latency spikes for small messages.
@@ -142,7 +140,7 @@ pub fn spawn(
// accept connections.)
DownloadBehavior::Error,
);
let task = COMPUTE_REQUEST_RUNTIME.spawn(exit_on_panic_or_error(
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
libpq_listener_main(
conf,
@@ -164,18 +162,17 @@ pub fn spawn(
}
/// Spawns a gRPC server for the page service.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn_grpc(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: std::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
basebackup_cache: Arc<BasebackupCache>,
) -> anyhow::Result<CancellableTask> {
// Use the compute runtime.
let _runtime = COMPUTE_REQUEST_RUNTIME.enter();
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
@@ -183,9 +180,18 @@ pub fn spawn_grpc(
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// NB: does not respect TCP settings, since we configure the socket manually.
// TODO: consider tuning window sizes.
// TODO: wire up tracing.
let mut server = tonic::transport::Server::builder()
@@ -213,55 +219,21 @@ pub fn spawn_grpc(
.build_v1()?;
let server = server.add_service(reflection_service);
// Set up the TCP socket. We take a preconfigured TcpListener to bind the port early.
listener.set_nonblocking(true)?;
setsockopt(&listener, sockopt::KeepAlive, &GRPC_TCP_KEEPALIVE)?;
let listener = tokio::net::TcpListener::from_std(listener)?;
// Build the serve future.
let cancel_serve = cancel.clone();
let serve = async move {
// Accept TCP connections.
let tcp_conns = TcpListenerStream::new(listener).map(|result| {
let tcp_conn = result.inspect_err(|err| error!("TCP accept failed: {err}"))?;
tcp_conn.set_nodelay(GRPC_TCP_NODELAY).inspect_err(|err| {
error!("TCP nodelay failed: {err}");
})?;
Ok(tcp_conn)
});
if let Some(tls_config) = tls_config {
// If TLS is enabled, decrypt the TCP streams before passing them to the server.
let tls_acceptor = TlsAcceptor::from(tls_config);
let tls_conns = async_stream::stream! {
for await result in tcp_conns {
match result {
Ok(tcp_conn) => yield tls_acceptor
.accept(tcp_conn)
.await
.inspect_err(|err| error!("TLS handshake failed: {err}")),
Err(err) => yield Err(err),
}
}
};
server
.serve_with_incoming_shutdown(tls_conns, cancel_serve.cancelled())
.await?;
} else {
// Otherwise, just pass the plaintext TCP streams.
server
.serve_with_incoming_shutdown(tcp_conns, cancel_serve.cancelled())
.await?;
}
// Clean shutdown, wait for tasks to finish.
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
anyhow::Ok(())
};
// Spawn a task to run the serve future.
let task = tokio::spawn(exit_on_panic_or_error("grpc listener", serve));
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
Ok(CancellableTask { task, cancel })
}

View File

@@ -70,6 +70,34 @@ pub(crate) enum JsonConversionError {
ParseJsonError(#[from] serde_json::Error),
#[error("unbalanced array")]
UnbalancedArray,
#[error("unbalanced quoted string")]
UnbalancedString,
}
enum OutputMode {
Array(Vec<Value>),
Object(Map<String, Value>),
}
impl OutputMode {
fn key(&mut self, key: &str) -> &mut Value {
match self {
OutputMode::Array(values) => push_entry(values, Value::Null),
OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null),
}
}
fn finish(self) -> Value {
match self {
OutputMode::Array(values) => Value::Array(values),
OutputMode::Object(map) => Value::Object(map),
}
}
}
fn push_entry<T>(arr: &mut Vec<T>, t: T) -> &mut T {
arr.push(t);
arr.last_mut().expect("a value was just inserted")
}
//
@@ -77,182 +105,276 @@ pub(crate) enum JsonConversionError {
//
pub(crate) fn pg_text_row_to_json(
row: &Row,
columns: &[Type],
raw_output: bool,
array_mode: bool,
) -> Result<Value, JsonConversionError> {
let iter = row
.columns()
.iter()
.zip(columns)
.enumerate()
.map(|(i, (column, typ))| {
let name = column.name();
let pg_value = row.as_text(i).map_err(JsonConversionError::AsTextError)?;
let json_value = if raw_output {
match pg_value {
Some(v) => Value::String(v.to_string()),
None => Value::Null,
}
} else {
pg_text_to_json(pg_value, typ)?
};
Ok((name.to_string(), json_value))
});
if array_mode {
// drop keys and aggregate into array
let arr = iter
.map(|r| r.map(|(_key, val)| val))
.collect::<Result<Vec<Value>, JsonConversionError>>()?;
Ok(Value::Array(arr))
let mut entries = if array_mode {
OutputMode::Array(Vec::with_capacity(row.columns().len()))
} else {
let obj = iter.collect::<Result<Map<String, Value>, JsonConversionError>>()?;
Ok(Value::Object(obj))
OutputMode::Object(Map::with_capacity(row.columns().len()))
};
for (i, column) in row.columns().iter().enumerate() {
let pg_value = row.as_text(i).map_err(JsonConversionError::AsTextError)?;
let value = entries.key(column.name());
match pg_value {
Some(v) if raw_output => *value = Value::String(v.to_string()),
Some(v) => pg_text_to_json(value, v, column.type_())?,
None => *value = Value::Null,
}
}
Ok(entries.finish())
}
//
// Convert postgres text-encoded value to JSON value
//
fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, JsonConversionError> {
if let Some(val) = pg_value {
if let Kind::Array(elem_type) = pg_type.kind() {
return pg_array_parse(val, elem_type);
}
fn pg_text_to_json(
output: &mut Value,
val: &str,
pg_type: &Type,
) -> Result<(), JsonConversionError> {
if let Kind::Array(elem_type) = pg_type.kind() {
// todo: we should fetch this from postgres.
let delimiter = ',';
match *pg_type {
Type::BOOL => Ok(Value::Bool(val == "t")),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
Ok(Value::Number(serde_json::Number::from(val)))
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
Ok(Value::Number(num))
} else {
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
Ok(Value::String(val.to_string()))
}
}
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
_ => Ok(Value::String(val.to_string())),
}
} else {
Ok(Value::Null)
}
}
//
// Parse postgres array into JSON array.
//
// This is a bit involved because we need to handle nested arrays and quoted
// values. Unlike postgres we don't check that all nested arrays have the same
// dimensions, we just return them as is.
//
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, JsonConversionError> {
pg_array_parse_inner(pg_array, elem_type, false).map(|(v, _)| v)
}
fn pg_array_parse_inner(
pg_array: &str,
elem_type: &Type,
nested: bool,
) -> Result<(Value, usize), JsonConversionError> {
let mut pg_array_chr = pg_array.char_indices();
let mut level = 0;
let mut quote = false;
let mut entries: Vec<Value> = Vec::new();
let mut entry = String::new();
// skip bounds decoration
if let Some('[') = pg_array.chars().next() {
for (_, c) in pg_array_chr.by_ref() {
if c == '=' {
break;
}
}
let mut array = vec![];
pg_array_parse(&mut array, val, elem_type, delimiter)?;
*output = Value::Array(array);
return Ok(());
}
fn push_checked(
entry: &mut String,
entries: &mut Vec<Value>,
elem_type: &Type,
) -> Result<(), JsonConversionError> {
if !entry.is_empty() {
// While in usual postgres response we get nulls as None and everything else
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
// here while we have quotation info and convert them to None.
if entry == "NULL" {
entries.push(pg_text_to_json(None, elem_type)?);
match *pg_type {
Type::BOOL => *output = Value::Bool(val == "t"),
Type::INT2 | Type::INT4 => {
let val = val.parse::<i32>()?;
*output = Value::Number(serde_json::Number::from(val));
}
Type::FLOAT4 | Type::FLOAT8 => {
let fval = val.parse::<f64>()?;
let num = serde_json::Number::from_f64(fval);
if let Some(num) = num {
*output = Value::Number(num);
} else {
entries.push(pg_text_to_json(Some(entry), elem_type)?);
// Pass Nan, Inf, -Inf as strings
// JS JSON.stringify() does converts them to null, but we
// want to preserve them, so we pass them as strings
*output = Value::String(val.to_string());
}
entry.clear();
}
Ok(())
Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?,
_ => *output = Value::String(val.to_string()),
}
while let Some((mut i, mut c)) = pg_array_chr.next() {
let mut escaped = false;
Ok(())
}
if c == '\\' {
escaped = true;
let Some(x) = pg_array_chr.next() else {
return Err(JsonConversionError::UnbalancedArray);
};
(i, c) = x;
}
match c {
'{' if !quote => {
level += 1;
if level > 1 {
let (res, off) = pg_array_parse_inner(&pg_array[i..], elem_type, true)?;
entries.push(res);
for _ in 0..off - 1 {
pg_array_chr.next();
}
}
}
'}' if !quote => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
if nested {
return Ok((Value::Array(entries), i));
}
}
}
'"' if !escaped => {
if quote {
// end of quoted string, so push it manually without any checks
// for emptiness or nulls
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
entry.clear();
}
quote = !quote;
}
',' if !quote => {
push_checked(&mut entry, &mut entries, elem_type)?;
}
_ => {
entry.push(c);
}
}
/// Parse postgres array into JSON array.
///
/// This is a bit involved because we need to handle nested arrays and quoted
/// values. Unlike postgres we don't check that all nested arrays have the same
/// dimensions, we just return them as is.
///
/// <https://www.postgresql.org/docs/current/arrays.html#ARRAYS-IO>
///
/// The external text representation of an array value consists of items that are interpreted
/// according to the I/O conversion rules for the array's element type, plus decoration that
/// indicates the array structure. The decoration consists of curly braces (`{` and `}`) around
/// the array value plus delimiter characters between adjacent items. The delimiter character
/// is usually a comma (,) but can be something else: it is determined by the typdelim setting
/// for the array's element type. Among the standard data types provided in the PostgreSQL
/// distribution, all use a comma, except for type box, which uses a semicolon (;).
///
/// In a multidimensional array, each dimension (row, plane, cube, etc.)
/// gets its own level of curly braces, and delimiters must be written between adjacent
/// curly-braced entities of the same level.
fn pg_array_parse(
elements: &mut Vec<Value>,
mut pg_array: &str,
elem: &Type,
delim: char,
) -> Result<(), JsonConversionError> {
// skip bounds decoration, eg:
// `[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}`
// technically these are significant, but we have no way to represent them in json.
if let Some('[') = pg_array.chars().next() {
let Some((_bounds, array)) = pg_array.split_once('=') else {
return Err(JsonConversionError::UnbalancedArray);
};
pg_array = array;
}
if level != 0 {
// whitespace might preceed a `{`.
let pg_array = pg_array.trim_start();
let rest = pg_array_parse_inner(elements, pg_array, elem, delim)?;
if !rest.is_empty() {
return Err(JsonConversionError::UnbalancedArray);
}
Ok((Value::Array(entries), 0))
Ok(())
}
/// reads a single array from the `pg_array` string and pushes each values to `elements`.
/// returns the rest of the `pg_array` string that was not read.
fn pg_array_parse_inner<'a>(
elements: &mut Vec<Value>,
mut pg_array: &'a str,
elem: &Type,
delim: char,
) -> Result<&'a str, JsonConversionError> {
// array should have a `{` prefix.
pg_array = pg_array
.strip_prefix('{')
.ok_or(JsonConversionError::UnbalancedArray)?;
let mut q = String::new();
loop {
let value = push_entry(elements, Value::Null);
pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?;
// check for separator.
if let Some(next) = pg_array.strip_prefix(delim) {
// next item.
pg_array = next;
} else {
break;
}
}
let Some(next) = pg_array.strip_prefix('}') else {
// missing `}` terminator.
return Err(JsonConversionError::UnbalancedArray);
};
// whitespace might follow a `}`.
Ok(next.trim_start())
}
/// reads a single item from the `pg_array` string.
/// returns the rest of the `pg_array` string that was not read.
///
/// `quoted` is a scratch allocation that has no defined output.
fn pg_array_parse_item<'a>(
output: &mut Value,
quoted: &mut String,
mut pg_array: &'a str,
elem: &Type,
delim: char,
) -> Result<&'a str, JsonConversionError> {
// We are trying to parse an array item.
// This could be a new array, if this is a multi-dimentional array.
// This could be a quoted string representing `elem`.
// This could be an unquoted string representing `elem`.
// whitespace might preceed an item.
pg_array = pg_array.trim_start();
if pg_array.strip_prefix('{').is_some() {
// nested array.
let mut nested = vec![];
pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?;
*output = Value::Array(nested);
return Ok(pg_array);
}
if let Some(mut pg_array) = pg_array.strip_prefix('"') {
pg_array = pg_array_parse_quoted(quoted, pg_array)?;
// we have unquoted an item string:
pg_text_to_json(output, quoted, elem)?;
quoted.clear();
return Ok(pg_array);
}
// we need to parse an item. read until we find a delimiter or `}`.
let index = pg_array
.find([delim, '}'])
.ok_or(JsonConversionError::UnbalancedArray)?;
let item;
(item, pg_array) = pg_array.split_at(index);
// item might have trailing whitespace that we need to ignore.
let item = item.trim_end();
// we might have an item string:
// check for null
if item == "NULL" {
*output = Value::Null;
} else {
pg_text_to_json(output, item, elem)?;
}
Ok(pg_array)
}
/// reads a single quoted item from the `pg_array` string.
///
/// Returns the rest of the `pg_array` string that was not read.
/// The output is written into `quoted`.
///
/// The pg_array string must have a `"` terminator, but the `"` initial value
/// must have already been removed from the input. The terminator is removed.
fn pg_array_parse_quoted<'a>(
quoted: &mut String,
mut pg_array: &'a str,
) -> Result<&'a str, JsonConversionError> {
// The array output routine will put double quotes around element values if they are empty strings,
// contain curly braces, delimiter characters, double quotes, backslashes, or white space,
// or match the word `NULL`. Double quotes and backslashes embedded in element values will be backslash-escaped.
// For numeric data types it is safe to assume that double quotes will never appear,
// but for textual data types one should be prepared to cope with either the presence or absence of quotes.
// We write to quoted in chunks terminated by an escape character.
// Eg if we have the input `foo\"bar"`, then we write `foo`, then `"`, then finally `bar`.
loop {
// we need to parse an chunk. read until we find a '\\' or `"`.
let i = pg_array
.find(['\\', '"'])
.ok_or(JsonConversionError::UnbalancedString)?;
let chunk: &str;
(chunk, pg_array) = pg_array
.split_at_checked(i)
.expect("i is guaranteed to be in-bounds of pg_array");
// push the chunk.
quoted.push_str(chunk);
// consume the chunk_end character.
let chunk_end: char;
(chunk_end, pg_array) =
split_first_char(pg_array).expect("pg_array should start with either '\\\\' or '\"'");
// finished.
if chunk_end == '"' {
// whitespace might follow the '"'.
pg_array = pg_array.trim_start();
break Ok(pg_array);
}
// consume the escaped character.
let escaped: char;
(escaped, pg_array) =
split_first_char(pg_array).ok_or(JsonConversionError::UnbalancedString)?;
quoted.push(escaped);
}
}
fn split_first_char(s: &str) -> Option<(char, &str)> {
let mut chars = s.chars();
let c = chars.next()?;
Some((c, chars.as_str()))
}
#[cfg(test)]
@@ -316,37 +438,33 @@ mod tests {
);
}
fn pg_text_to_json(val: &str, pg_type: &Type) -> Value {
let mut v = Value::Null;
super::pg_text_to_json(&mut v, val, pg_type).unwrap();
v
}
fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value {
let mut array = vec![];
super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap();
Value::Array(array)
}
#[test]
fn test_atomic_types_parse() {
assert_eq!(pg_text_to_json("foo", &Type::TEXT), json!("foo"));
assert_eq!(pg_text_to_json("42", &Type::INT4), json!(42));
assert_eq!(pg_text_to_json("42", &Type::INT2), json!(42));
assert_eq!(pg_text_to_json("42", &Type::INT8), json!("42"));
assert_eq!(pg_text_to_json("42.42", &Type::FLOAT8), json!(42.42));
assert_eq!(pg_text_to_json("42.42", &Type::FLOAT4), json!(42.42));
assert_eq!(pg_text_to_json("NaN", &Type::FLOAT4), json!("NaN"));
assert_eq!(
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
json!("foo")
);
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
assert_eq!(
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
json!("42")
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
json!(42.42)
);
assert_eq!(
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
json!("NaN")
);
assert_eq!(
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
pg_text_to_json("Infinity", &Type::FLOAT4),
json!("Infinity")
);
assert_eq!(
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
pg_text_to_json("-Infinity", &Type::FLOAT4),
json!("-Infinity")
);
@@ -355,10 +473,9 @@ mod tests {
.unwrap();
assert_eq!(
pg_text_to_json(
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#,
&Type::JSONB
)
.unwrap(),
),
json
);
}
@@ -366,7 +483,7 @@ mod tests {
#[test]
fn test_pg_array_parse_text() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
pg_array_parse(pg_arr, &Type::TEXT)
}
assert_eq!(
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
@@ -389,7 +506,7 @@ mod tests {
#[test]
fn test_pg_array_parse_bool() {
fn pb(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
pg_array_parse(pg_arr, &Type::BOOL)
}
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
@@ -406,7 +523,7 @@ mod tests {
#[test]
fn test_pg_array_parse_numbers() {
fn pn(pg_arr: &str, ty: &Type) -> Value {
pg_array_parse(pg_arr, ty).unwrap()
pg_array_parse(pg_arr, ty)
}
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
@@ -434,7 +551,7 @@ mod tests {
#[test]
fn test_pg_array_with_decoration() {
fn p(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::INT2).unwrap()
pg_array_parse(pg_arr, &Type::INT2)
}
assert_eq!(
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
@@ -445,7 +562,7 @@ mod tests {
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
pg_array_parse(pg_arr, &Type::JSONB)
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(

View File

@@ -1102,7 +1102,6 @@ async fn query_to_json<T: GenericClient>(
let columns_len = row_stream.statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut types = Vec::with_capacity(columns_len);
for c in row_stream.statement.columns() {
fields.push(json!({
@@ -1114,8 +1113,6 @@ async fn query_to_json<T: GenericClient>(
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
types.push(c.type_().clone());
}
let raw_output = parsed_headers.raw_output;
@@ -1137,7 +1134,7 @@ async fn query_to_json<T: GenericClient>(
));
}
let row = pg_text_row_to_json(&row, &types, raw_output, array_mode)?;
let row = pg_text_row_to_json(&row, raw_output, array_mode)?;
rows.push(row);
// assumption: parsing pg text and converting to json takes CPU time.