mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 12:52:55 +00:00
Compare commits
15 Commits
arpad/blob
...
proxy-refa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4673dd6d29 | ||
|
|
37f5a6434b | ||
|
|
6588edd693 | ||
|
|
973eb69cd3 | ||
|
|
8bb2127a19 | ||
|
|
b5ad693a87 | ||
|
|
5a9138a764 | ||
|
|
1466767571 | ||
|
|
f11254f2c5 | ||
|
|
4529b463b5 | ||
|
|
a8d4634191 | ||
|
|
53de382533 | ||
|
|
05f7fc4a06 | ||
|
|
6946325596 | ||
|
|
b41070ba53 |
@@ -42,6 +42,7 @@ COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_i
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
ARG ADDITIONAL_RUSTFLAGS
|
||||
ENV _RJEM_MALLOC_CONF="thp:never"
|
||||
RUN set -e \
|
||||
&& PQ_LIB_DIR=$(pwd)/pg_install/v${STABLE_PG_VERSION}/lib RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment ${ADDITIONAL_RUSTFLAGS}" cargo build \
|
||||
--bin pg_sni_router \
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
@@ -147,14 +148,15 @@ impl JwkCacheEntryLock {
|
||||
Err(e) => tracing::warn!(url=?rule.jwks_url, error=?e, "could not fetch JWKs"),
|
||||
Ok(r) => {
|
||||
let resp: http::Response<reqwest::Body> = r.into();
|
||||
match parse_json_body_with_limit::<jose_jwk::JwkSet>(
|
||||
match parse_json_body_with_limit::<jose_jwk::JwkSet, _>(
|
||||
PhantomData,
|
||||
resp.into_body(),
|
||||
MAX_JWK_BODY_SIZE,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
tracing::warn!(url=?rule.jwks_url, error=?e, "could not decode JWKs");
|
||||
tracing::warn!(url=?rule.jwks_url, error=%e, "could not decode JWKs");
|
||||
}
|
||||
Ok(jwks) => {
|
||||
key_sets.insert(
|
||||
|
||||
@@ -297,6 +297,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
build_tag: BUILD_TAG,
|
||||
});
|
||||
|
||||
proxy::jemalloc::inspect_thp()?;
|
||||
let jemalloc = match proxy::jemalloc::MetricRecorder::new() {
|
||||
Ok(t) => Some(t),
|
||||
Err(e) => {
|
||||
|
||||
@@ -6,11 +6,10 @@ pub mod health_server;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use bytes::Bytes;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper1::body::Body;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::de::DeserializeSeed;
|
||||
|
||||
pub(crate) use reqwest::{Request, Response};
|
||||
pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
@@ -113,10 +112,21 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn parse_json_body_with_limit<D: DeserializeOwned>(
|
||||
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ReadPayloadError<E> {
|
||||
#[error("could not read the HTTP body: {0}")]
|
||||
Read(E),
|
||||
#[error("could not parse the HTTP body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
#[error("could not parse the HTTP body: content length exceeds limit of {0} bytes")]
|
||||
LengthExceeded(usize),
|
||||
}
|
||||
|
||||
pub(crate) async fn parse_json_body_with_limit<D, E>(
|
||||
seed: impl for<'de> DeserializeSeed<'de, Value = D>,
|
||||
mut b: impl Body<Data = Bytes, Error = E> + Unpin,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<D> {
|
||||
) -> Result<D, ReadPayloadError<E>> {
|
||||
// We could use `b.limited().collect().await.to_bytes()` here
|
||||
// but this ends up being slightly more efficient as far as I can tell.
|
||||
|
||||
@@ -124,20 +134,25 @@ pub(crate) async fn parse_json_body_with_limit<D: DeserializeOwned>(
|
||||
// in reqwest, this value is influenced by the Content-Length header.
|
||||
let lower_bound = match usize::try_from(b.size_hint().lower()) {
|
||||
Ok(bound) if bound <= limit => bound,
|
||||
_ => bail!("Content length exceeds limit of {limit} bytes"),
|
||||
_ => return Err(ReadPayloadError::LengthExceeded(limit)),
|
||||
};
|
||||
let mut bytes = Vec::with_capacity(lower_bound);
|
||||
|
||||
while let Some(frame) = b.frame().await.transpose()? {
|
||||
while let Some(frame) = b
|
||||
.frame()
|
||||
.await
|
||||
.transpose()
|
||||
.map_err(ReadPayloadError::Read)?
|
||||
{
|
||||
if let Ok(data) = frame.into_data() {
|
||||
if bytes.len() + data.len() > limit {
|
||||
bail!("Content length exceeds limit of {limit} bytes")
|
||||
return Err(ReadPayloadError::LengthExceeded(limit));
|
||||
}
|
||||
bytes.extend_from_slice(&data);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(serde_json::from_slice::<D>(&bytes)?)
|
||||
Ok(seed.deserialize(&mut serde_json::Deserializer::from_slice(&bytes))?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -9,7 +9,8 @@ use measured::{
|
||||
text::TextEncoder,
|
||||
LabelGroup, MetricGroup,
|
||||
};
|
||||
use tikv_jemalloc_ctl::{config, epoch, epoch_mib, stats, version};
|
||||
use tikv_jemalloc_ctl::{config, epoch, epoch_mib, stats, version, Access, AsName, Name};
|
||||
use tracing::info;
|
||||
|
||||
pub struct MetricRecorder {
|
||||
epoch: epoch_mib,
|
||||
@@ -114,3 +115,10 @@ jemalloc_gauge!(mapped, mapped_mib);
|
||||
jemalloc_gauge!(metadata, metadata_mib);
|
||||
jemalloc_gauge!(resident, resident_mib);
|
||||
jemalloc_gauge!(retained, retained_mib);
|
||||
|
||||
pub fn inspect_thp() -> Result<(), tikv_jemalloc_ctl::Error> {
|
||||
let opt_thp: &Name = c"opt.thp".to_bytes_with_nul().name();
|
||||
let s: &str = opt_thp.read()?;
|
||||
info!("jemalloc opt.thp {s}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,18 +1,534 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Range;
|
||||
|
||||
use itertools::Itertools;
|
||||
use serde::de;
|
||||
use serde::de::DeserializeSeed;
|
||||
use serde::Deserialize;
|
||||
use serde::Deserializer;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use tokio_postgres::types::Kind;
|
||||
use tokio_postgres::types::Type;
|
||||
use tokio_postgres::Row;
|
||||
|
||||
//
|
||||
// Convert json non-string types to strings, so that they can be passed to Postgres
|
||||
// as parameters.
|
||||
//
|
||||
pub(crate) fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
|
||||
json.iter().map(json_value_to_pg_text).collect()
|
||||
use super::sql_over_http::BatchQueryData;
|
||||
use super::sql_over_http::Payload;
|
||||
use super::sql_over_http::QueryData;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Slice {
|
||||
pub start: u32,
|
||||
pub len: u32,
|
||||
}
|
||||
|
||||
fn json_value_to_pg_text(value: &Value) -> Option<String> {
|
||||
impl Slice {
|
||||
pub fn into_range(self) -> Range<usize> {
|
||||
let start = self.start as usize;
|
||||
let end = start + self.len as usize;
|
||||
start..end
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Arena {
|
||||
pub str_arena: String,
|
||||
pub params_arena: Vec<Option<Slice>>,
|
||||
}
|
||||
|
||||
impl Arena {
|
||||
fn alloc_str(&mut self, s: &str) -> Slice {
|
||||
let start = self.str_arena.len() as u32;
|
||||
let len = s.len() as u32;
|
||||
self.str_arena.push_str(s);
|
||||
Slice { start, len }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SerdeArena<'a, T> {
|
||||
pub arena: &'a mut Arena,
|
||||
pub _t: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'a, T> SerdeArena<'a, T> {
|
||||
fn alloc_str(&mut self, s: &str) -> Slice {
|
||||
self.arena.alloc_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'de> DeserializeSeed<'de> for SerdeArena<'a, Vec<QueryData>> {
|
||||
type Value = Vec<QueryData>;
|
||||
fn deserialize<D>(self, d: D) -> Result<Self::Value, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct VecVisitor<'a>(SerdeArena<'a, Vec<QueryData>>);
|
||||
|
||||
impl<'a, 'de> de::Visitor<'de> for VecVisitor<'a> {
|
||||
type Value = Vec<QueryData>;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a sequence")
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: de::SeqAccess<'de>,
|
||||
{
|
||||
let mut values = Vec::new();
|
||||
|
||||
while let Some(value) = seq.next_element_seed(SerdeArena {
|
||||
arena: &mut *self.0.arena,
|
||||
_t: PhantomData::<QueryData>,
|
||||
})? {
|
||||
values.push(value);
|
||||
}
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
}
|
||||
|
||||
d.deserialize_seq(VecVisitor(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'de> DeserializeSeed<'de> for SerdeArena<'a, Slice> {
|
||||
type Value = Slice;
|
||||
fn deserialize<D>(self, d: D) -> Result<Self::Value, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct Visitor<'a>(SerdeArena<'a, Slice>);
|
||||
|
||||
impl<'a, 'de> de::Visitor<'de> for Visitor<'a> {
|
||||
type Value = Slice;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a string")
|
||||
}
|
||||
|
||||
fn visit_str<E>(mut self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
Ok(self.0.alloc_str(v))
|
||||
}
|
||||
}
|
||||
|
||||
d.deserialize_str(Visitor(self))
|
||||
}
|
||||
}
|
||||
|
||||
enum States {
|
||||
Empty,
|
||||
HasQueries(Vec<QueryData>),
|
||||
HasPartialQueryData {
|
||||
query: Option<Slice>,
|
||||
params: Option<Slice>,
|
||||
#[allow(clippy::option_option)]
|
||||
array_mode: Option<Option<bool>>,
|
||||
},
|
||||
}
|
||||
|
||||
enum Field {
|
||||
Queries,
|
||||
Query,
|
||||
Params,
|
||||
ArrayMode,
|
||||
Ignore,
|
||||
}
|
||||
|
||||
struct FieldVisitor;
|
||||
|
||||
impl<'de> de::Visitor<'de> for FieldVisitor {
|
||||
type Value = Field;
|
||||
|
||||
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.write_str(
|
||||
r#"a JSON object string of either "query", "params", "arrayMode", or "queries"."#,
|
||||
)
|
||||
}
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
self.visit_bytes(v.as_bytes())
|
||||
}
|
||||
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
match v {
|
||||
b"queries" => Ok(Field::Queries),
|
||||
b"query" => Ok(Field::Query),
|
||||
b"params" => Ok(Field::Params),
|
||||
b"arrayMode" => Ok(Field::ArrayMode),
|
||||
_ => Ok(Field::Ignore),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Field {
|
||||
#[inline]
|
||||
fn deserialize<D>(d: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
d.deserialize_identifier(FieldVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'de> DeserializeSeed<'de> for SerdeArena<'a, QueryData> {
|
||||
type Value = QueryData;
|
||||
fn deserialize<D>(self, d: D) -> Result<Self::Value, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct Visitor<'a>(SerdeArena<'a, QueryData>);
|
||||
impl<'a, 'de> de::Visitor<'de> for Visitor<'a> {
|
||||
type Value = QueryData;
|
||||
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.write_str(
|
||||
"a json object containing either a query object, or a list of query objects",
|
||||
)
|
||||
}
|
||||
#[inline]
|
||||
fn visit_map<A>(self, mut m: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: de::MapAccess<'de>,
|
||||
{
|
||||
let mut state = States::Empty;
|
||||
|
||||
while let Some(key) = m.next_key()? {
|
||||
match key {
|
||||
Field::Query => {
|
||||
let (params, array_mode) = match state {
|
||||
States::HasQueries(_) => unreachable!(),
|
||||
States::HasPartialQueryData { query: Some(_), .. } => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field("query"))
|
||||
}
|
||||
States::Empty => (None, None),
|
||||
States::HasPartialQueryData {
|
||||
query: None,
|
||||
params,
|
||||
array_mode,
|
||||
} => (params, array_mode),
|
||||
};
|
||||
state = States::HasPartialQueryData {
|
||||
query: Some(m.next_value_seed(SerdeArena {
|
||||
arena: &mut *self.0.arena,
|
||||
_t: PhantomData::<Slice>,
|
||||
})?),
|
||||
params,
|
||||
array_mode,
|
||||
};
|
||||
}
|
||||
Field::Params => {
|
||||
let (query, array_mode) = match state {
|
||||
States::HasQueries(_) => unreachable!(),
|
||||
States::HasPartialQueryData {
|
||||
params: Some(_), ..
|
||||
} => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field("params"))
|
||||
}
|
||||
States::Empty => (None, None),
|
||||
States::HasPartialQueryData {
|
||||
query,
|
||||
params: None,
|
||||
array_mode,
|
||||
} => (query, array_mode),
|
||||
};
|
||||
|
||||
let params = m.next_value::<PgText>()?.value;
|
||||
let start = self.0.arena.params_arena.len() as u32;
|
||||
let len = params.len() as u32;
|
||||
for param in params {
|
||||
match param {
|
||||
Some(s) => {
|
||||
let s = self.0.arena.alloc_str(&s);
|
||||
self.0.arena.params_arena.push(Some(s));
|
||||
}
|
||||
None => self.0.arena.params_arena.push(None),
|
||||
}
|
||||
}
|
||||
|
||||
state = States::HasPartialQueryData {
|
||||
query,
|
||||
params: Some(Slice { start, len }),
|
||||
array_mode,
|
||||
};
|
||||
}
|
||||
Field::ArrayMode => {
|
||||
let (query, params) = match state {
|
||||
States::HasQueries(_) => unreachable!(),
|
||||
States::HasPartialQueryData {
|
||||
array_mode: Some(_),
|
||||
..
|
||||
} => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field(
|
||||
"arrayMode",
|
||||
))
|
||||
}
|
||||
States::Empty => (None, None),
|
||||
States::HasPartialQueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode: None,
|
||||
} => (query, params),
|
||||
};
|
||||
state = States::HasPartialQueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode: Some(m.next_value()?),
|
||||
};
|
||||
}
|
||||
Field::Queries | Field::Ignore => {
|
||||
let _ = m.next_value::<de::IgnoredAny>()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
match state {
|
||||
States::HasQueries(_) => unreachable!(),
|
||||
States::HasPartialQueryData {
|
||||
query: Some(query),
|
||||
params: Some(params),
|
||||
array_mode,
|
||||
} => Ok(QueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode: array_mode.unwrap_or_default(),
|
||||
}),
|
||||
States::Empty | States::HasPartialQueryData { query: None, .. } => {
|
||||
Err(<A::Error as de::Error>::missing_field("query"))
|
||||
}
|
||||
States::HasPartialQueryData { params: None, .. } => {
|
||||
Err(<A::Error as de::Error>::missing_field("params"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Deserializer::deserialize_struct(
|
||||
d,
|
||||
"QueryData",
|
||||
&["query", "params", "arrayMode"],
|
||||
Visitor(self),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'de> DeserializeSeed<'de> for SerdeArena<'a, Payload> {
|
||||
type Value = Payload;
|
||||
fn deserialize<D>(self, d: D) -> Result<Self::Value, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct Visitor<'a>(SerdeArena<'a, Payload>);
|
||||
impl<'a, 'de> de::Visitor<'de> for Visitor<'a> {
|
||||
type Value = Payload;
|
||||
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.write_str(
|
||||
"a json object containing either a query object, or a list of query objects",
|
||||
)
|
||||
}
|
||||
#[inline]
|
||||
fn visit_map<A>(self, mut m: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: de::MapAccess<'de>,
|
||||
{
|
||||
let mut state = States::Empty;
|
||||
|
||||
while let Some(key) = m.next_key()? {
|
||||
match key {
|
||||
Field::Queries => match state {
|
||||
States::Empty => {
|
||||
state = States::HasQueries(m.next_value_seed(SerdeArena {
|
||||
arena: &mut *self.0.arena,
|
||||
_t: PhantomData::<Vec<QueryData>>,
|
||||
})?);
|
||||
}
|
||||
States::HasQueries(_) => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field("queries"))
|
||||
}
|
||||
States::HasPartialQueryData { .. } => {
|
||||
return Err(<A::Error as de::Error>::unknown_field(
|
||||
"queries",
|
||||
&["query", "params", "arrayMode"],
|
||||
))
|
||||
}
|
||||
},
|
||||
Field::Query => {
|
||||
let (params, array_mode) = match state {
|
||||
States::HasQueries(_) => {
|
||||
return Err(<A::Error as de::Error>::unknown_field(
|
||||
"query",
|
||||
&["queries"],
|
||||
))
|
||||
}
|
||||
States::HasPartialQueryData { query: Some(_), .. } => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field("query"))
|
||||
}
|
||||
States::Empty => (None, None),
|
||||
States::HasPartialQueryData {
|
||||
query: None,
|
||||
params,
|
||||
array_mode,
|
||||
} => (params, array_mode),
|
||||
};
|
||||
state = States::HasPartialQueryData {
|
||||
query: Some(m.next_value_seed(SerdeArena {
|
||||
arena: &mut *self.0.arena,
|
||||
_t: PhantomData::<Slice>,
|
||||
})?),
|
||||
params,
|
||||
array_mode,
|
||||
};
|
||||
}
|
||||
Field::Params => {
|
||||
let (query, array_mode) = match state {
|
||||
States::HasQueries(_) => {
|
||||
return Err(<A::Error as de::Error>::unknown_field(
|
||||
"params",
|
||||
&["queries"],
|
||||
))
|
||||
}
|
||||
States::HasPartialQueryData {
|
||||
params: Some(_), ..
|
||||
} => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field("params"))
|
||||
}
|
||||
States::Empty => (None, None),
|
||||
States::HasPartialQueryData {
|
||||
query,
|
||||
params: None,
|
||||
array_mode,
|
||||
} => (query, array_mode),
|
||||
};
|
||||
|
||||
let params = m.next_value::<PgText>()?.value;
|
||||
let start = self.0.arena.params_arena.len() as u32;
|
||||
let len = params.len() as u32;
|
||||
for param in params {
|
||||
match param {
|
||||
Some(s) => {
|
||||
let s = self.0.arena.alloc_str(&s);
|
||||
self.0.arena.params_arena.push(Some(s));
|
||||
}
|
||||
None => self.0.arena.params_arena.push(None),
|
||||
}
|
||||
}
|
||||
|
||||
state = States::HasPartialQueryData {
|
||||
query,
|
||||
params: Some(Slice { start, len }),
|
||||
array_mode,
|
||||
};
|
||||
}
|
||||
Field::ArrayMode => {
|
||||
let (query, params) = match state {
|
||||
States::HasQueries(_) => {
|
||||
return Err(<A::Error as de::Error>::unknown_field(
|
||||
"arrayMode",
|
||||
&["queries"],
|
||||
))
|
||||
}
|
||||
States::HasPartialQueryData {
|
||||
array_mode: Some(_),
|
||||
..
|
||||
} => {
|
||||
return Err(<A::Error as de::Error>::duplicate_field(
|
||||
"arrayMode",
|
||||
))
|
||||
}
|
||||
States::Empty => (None, None),
|
||||
States::HasPartialQueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode: None,
|
||||
} => (query, params),
|
||||
};
|
||||
state = States::HasPartialQueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode: Some(m.next_value()?),
|
||||
};
|
||||
}
|
||||
Field::Ignore => {
|
||||
let _ = m.next_value::<de::IgnoredAny>()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
match state {
|
||||
States::HasQueries(queries) => Ok(Payload::Batch(BatchQueryData { queries })),
|
||||
States::HasPartialQueryData {
|
||||
query: Some(query),
|
||||
params: Some(params),
|
||||
array_mode,
|
||||
} => Ok(Payload::Single(QueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode: array_mode.unwrap_or_default(),
|
||||
})),
|
||||
States::Empty | States::HasPartialQueryData { query: None, .. } => {
|
||||
Err(<A::Error as de::Error>::missing_field("query"))
|
||||
}
|
||||
States::HasPartialQueryData { params: None, .. } => {
|
||||
Err(<A::Error as de::Error>::missing_field("params"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Deserializer::deserialize_struct(
|
||||
d,
|
||||
"Payload",
|
||||
&["queries", "query", "params", "arrayMode"],
|
||||
Visitor(self),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct PgText {
|
||||
value: Vec<Option<String>>,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for PgText {
|
||||
fn deserialize<D>(d: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct VecVisitor;
|
||||
|
||||
impl<'de> de::Visitor<'de> for VecVisitor {
|
||||
type Value = Vec<Option<String>>;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a sequence of postgres parameters")
|
||||
}
|
||||
|
||||
fn visit_seq<A>(self, mut seq: A) -> Result<Vec<Option<String>>, A::Error>
|
||||
where
|
||||
A: de::SeqAccess<'de>,
|
||||
{
|
||||
let mut values = Vec::new();
|
||||
|
||||
// TODO: consider avoiding the allocations for json::Value here.
|
||||
while let Some(value) = seq.next_element()? {
|
||||
values.push(json_value_to_pg_text(value));
|
||||
}
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
}
|
||||
|
||||
let value = d.deserialize_seq(VecVisitor)?;
|
||||
|
||||
Ok(PgText { value })
|
||||
}
|
||||
}
|
||||
|
||||
fn json_value_to_pg_text(value: Value) -> Option<String> {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
@@ -21,10 +537,10 @@ fn json_value_to_pg_text(value: &Value) -> Option<String> {
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
|
||||
|
||||
// avoid escaping here, as we pass this as a parameter
|
||||
Value::String(s) => Some(s.to_string()),
|
||||
Value::String(s) => Some(s),
|
||||
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
Value::Array(arr) => Some(json_array_to_pg_array(arr)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +552,17 @@ fn json_value_to_pg_text(value: &Value) -> Option<String> {
|
||||
//
|
||||
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
|
||||
//
|
||||
fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
fn json_array_to_pg_array(arr: Vec<Value>) -> String {
|
||||
let vals = arr
|
||||
.into_iter()
|
||||
.map(json_array_value_to_pg_array)
|
||||
.map(|v| v.unwrap_or_else(|| "NULL".to_string()))
|
||||
.join(",");
|
||||
|
||||
format!("{{{vals}}}")
|
||||
}
|
||||
|
||||
fn json_array_value_to_pg_array(value: Value) -> Option<String> {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
@@ -44,19 +570,10 @@ fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
// convert to text with escaping
|
||||
// here string needs to be escaped, as it is part of the array
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => Some(v.to_string()),
|
||||
v @ Value::Object(_) => json_array_to_pg_array(&Value::String(v.to_string())),
|
||||
v @ Value::Object(_) => json_array_value_to_pg_array(Value::String(v.to_string())),
|
||||
|
||||
// recurse into array
|
||||
Value::Array(arr) => {
|
||||
let vals = arr
|
||||
.iter()
|
||||
.map(json_array_to_pg_array)
|
||||
.map(|v| v.unwrap_or_else(|| "NULL".to_string()))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
Some(format!("{{{vals}}}"))
|
||||
}
|
||||
Value::Array(arr) => Some(json_array_to_pg_array(arr)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,24 +778,22 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_to_pg_params() {
|
||||
let json = vec![Value::Bool(true), Value::Bool(false)];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some("true".to_owned()), Some("false".to_owned())]
|
||||
);
|
||||
let pg_params = json_value_to_pg_text(Value::Bool(true));
|
||||
assert_eq!(pg_params, Some("true".to_owned()));
|
||||
let pg_params = json_value_to_pg_text(Value::Bool(false));
|
||||
assert_eq!(pg_params, Some("false".to_owned()));
|
||||
|
||||
let json = vec![Value::Number(serde_json::Number::from(42))];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![Some("42".to_owned())]);
|
||||
let json = Value::Number(serde_json::Number::from(42));
|
||||
let pg_params = json_value_to_pg_text(json);
|
||||
assert_eq!(pg_params, Some("42".to_owned()));
|
||||
|
||||
let json = vec![Value::String("foo\"".to_string())];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![Some("foo\"".to_owned())]);
|
||||
let json = Value::String("foo\"".to_string());
|
||||
let pg_params = json_value_to_pg_text(json);
|
||||
assert_eq!(pg_params, Some("foo\"".to_owned()));
|
||||
|
||||
let json = vec![Value::Null];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![None]);
|
||||
let json = Value::Null;
|
||||
let pg_params = json_value_to_pg_text(json);
|
||||
assert_eq!(pg_params, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -286,31 +801,27 @@ mod tests {
|
||||
// atoms and escaping
|
||||
let json = "[true, false, null, \"NULL\", 42, \"foo\", \"bar\\\"-\\\\\"]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
let pg_params = json_value_to_pg_text(json);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(
|
||||
"{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned()
|
||||
)]
|
||||
Some("{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned())
|
||||
);
|
||||
|
||||
// nested arrays
|
||||
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
let pg_params = json_value_to_pg_text(json);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(
|
||||
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
|
||||
)]
|
||||
Some("{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned())
|
||||
);
|
||||
// array of objects
|
||||
let json = r#"[{"foo": 1},{"bar": 2}]"#;
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
let pg_params = json_value_to_pg_text(json);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
|
||||
Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -21,8 +22,6 @@ use hyper1::Response;
|
||||
use hyper1::StatusCode;
|
||||
use hyper1::{HeaderMap, Request};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tokio::time;
|
||||
use tokio_postgres::error::DbError;
|
||||
use tokio_postgres::error::ErrorPosition;
|
||||
@@ -51,15 +50,18 @@ use crate::context::RequestMonitoring;
|
||||
use crate::error::ErrorKind;
|
||||
use crate::error::ReportableError;
|
||||
use crate::error::UserFacingError;
|
||||
use crate::http::parse_json_body_with_limit;
|
||||
use crate::metrics::HttpDirection;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::serverless::json::Arena;
|
||||
use crate::serverless::json::SerdeArena;
|
||||
use crate::usage_metrics::MetricCounterRecorder;
|
||||
use crate::DbName;
|
||||
use crate::RoleName;
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::backend::LocalProxyConnError;
|
||||
use super::backend::PoolingBackend;
|
||||
use super::conn_pool::AuthData;
|
||||
@@ -67,28 +69,21 @@ use super::conn_pool::Client;
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::ConnInfoWithAuth;
|
||||
use super::http_util::json_response;
|
||||
use super::json::json_to_pg_text;
|
||||
use super::json::pg_text_row_to_json;
|
||||
use super::json::JsonConversionError;
|
||||
use super::json::Slice;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct QueryData {
|
||||
query: String,
|
||||
#[serde(deserialize_with = "bytes_to_pg_text")]
|
||||
params: Vec<Option<String>>,
|
||||
#[serde(default)]
|
||||
array_mode: Option<bool>,
|
||||
pub(crate) struct QueryData {
|
||||
pub(crate) query: Slice,
|
||||
pub(crate) params: Slice,
|
||||
pub(crate) array_mode: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct BatchQueryData {
|
||||
queries: Vec<QueryData>,
|
||||
pub(crate) struct BatchQueryData {
|
||||
pub(crate) queries: Vec<QueryData>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum Payload {
|
||||
pub(crate) enum Payload {
|
||||
Single(QueryData),
|
||||
Batch(BatchQueryData),
|
||||
}
|
||||
@@ -103,15 +98,6 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab
|
||||
|
||||
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
|
||||
|
||||
fn bytes_to_pg_text<'de, D>(deserializer: D) -> Result<Vec<Option<String>>, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
// TODO: consider avoiding the allocation here.
|
||||
let json: Vec<Value> = serde::de::Deserialize::deserialize(deserializer)?;
|
||||
Ok(json_to_pg_text(json))
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ConnInfoError {
|
||||
#[error("invalid header: {0}")]
|
||||
@@ -381,7 +367,7 @@ pub(crate) async fn handle(
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SqlOverHttpError {
|
||||
#[error("{0}")]
|
||||
ReadPayload(#[from] ReadPayloadError),
|
||||
ReadPayload(ReadPayloadError),
|
||||
#[error("{0}")]
|
||||
ConnectCompute(#[from] HttpConnError),
|
||||
#[error("{0}")]
|
||||
@@ -435,9 +421,9 @@ impl UserFacingError for SqlOverHttpError {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ReadPayloadError {
|
||||
#[error("could not read the HTTP request body: {0}")]
|
||||
Read(#[from] hyper1::Error),
|
||||
Read(hyper1::Error),
|
||||
#[error("could not parse the HTTP request body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
Parse(serde_json::Error),
|
||||
}
|
||||
|
||||
impl ReportableError for ReadPayloadError {
|
||||
@@ -449,6 +435,18 @@ impl ReportableError for ReadPayloadError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::http::ReadPayloadError<hyper1::Error>> for SqlOverHttpError {
|
||||
fn from(value: crate::http::ReadPayloadError<hyper1::Error>) -> Self {
|
||||
match value {
|
||||
crate::http::ReadPayloadError::Read(e) => Self::ReadPayload(ReadPayloadError::Read(e)),
|
||||
crate::http::ReadPayloadError::Parse(e) => {
|
||||
Self::ReadPayload(ReadPayloadError::Parse(e))
|
||||
}
|
||||
crate::http::ReadPayloadError::LengthExceeded(x) => Self::RequestTooLarge(x as u64),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SqlOverHttpCancel {
|
||||
#[error("query was cancelled")]
|
||||
@@ -581,16 +579,16 @@ async fn handle_db_inner(
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
let headers = request.headers();
|
||||
let (parts, body) = request.into_parts();
|
||||
|
||||
// Allow connection pooling only if explicitly requested
|
||||
// or if we have decided that http pool is no longer opt-in
|
||||
let allow_pool = !config.http_config.pool_options.opt_in
|
||||
|| headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|
||||
|| parts.headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|
||||
|
||||
let parsed_headers = HttpHeaders::try_parse(headers)?;
|
||||
let parsed_headers = HttpHeaders::try_parse(&parts.headers)?;
|
||||
|
||||
let request_content_length = match request.body().size_hint().upper() {
|
||||
let request_content_length = match body.size_hint().upper() {
|
||||
Some(v) => v,
|
||||
None => config.http_config.max_request_size_bytes + 1,
|
||||
};
|
||||
@@ -608,15 +606,20 @@ async fn handle_db_inner(
|
||||
));
|
||||
}
|
||||
|
||||
let fetch_and_process_request = Box::pin(
|
||||
async {
|
||||
let body = request.into_body().collect().await?.to_bytes();
|
||||
info!(length = body.len(), "request payload read");
|
||||
let payload: Payload = serde_json::from_slice(&body)?;
|
||||
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
|
||||
}
|
||||
.map_err(SqlOverHttpError::from),
|
||||
);
|
||||
let fetch_and_process_request = Box::pin(async move {
|
||||
let mut arena = Arena::default();
|
||||
let seed = SerdeArena {
|
||||
arena: &mut arena,
|
||||
_t: PhantomData::<Payload>,
|
||||
};
|
||||
let payload = parse_json_body_with_limit(
|
||||
seed,
|
||||
body,
|
||||
config.http_config.max_request_size_bytes as usize,
|
||||
)
|
||||
.await?;
|
||||
Ok::<(Arena, Payload), SqlOverHttpError>((arena, payload)) // Adjust error type accordingly
|
||||
});
|
||||
|
||||
let authenticate_and_connect = Box::pin(
|
||||
async {
|
||||
@@ -659,7 +662,7 @@ async fn handle_db_inner(
|
||||
.map_err(SqlOverHttpError::from),
|
||||
);
|
||||
|
||||
let (payload, mut client) = match run_until_cancelled(
|
||||
let ((mut arena, payload), mut client) = match run_until_cancelled(
|
||||
// Run both operations in parallel
|
||||
try_join(
|
||||
pin!(fetch_and_process_request),
|
||||
@@ -673,6 +676,9 @@ async fn handle_db_inner(
|
||||
None => return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Connect)),
|
||||
};
|
||||
|
||||
arena.params_arena.shrink_to_fit();
|
||||
arena.str_arena.shrink_to_fit();
|
||||
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/json");
|
||||
@@ -680,7 +686,7 @@ async fn handle_db_inner(
|
||||
// Now execute the query and return the result.
|
||||
let json_output = match payload {
|
||||
Payload::Single(stmt) => {
|
||||
stmt.process(config, cancel, &mut client, parsed_headers)
|
||||
stmt.process(config, &arena, cancel, &mut client, parsed_headers)
|
||||
.await?
|
||||
}
|
||||
Payload::Batch(statements) => {
|
||||
@@ -698,11 +704,18 @@ async fn handle_db_inner(
|
||||
}
|
||||
|
||||
statements
|
||||
.process(config, cancel, &mut client, parsed_headers)
|
||||
.process(config, &arena, cancel, &mut client, parsed_headers)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
str_len = arena.str_arena.len(),
|
||||
params = arena.params_arena.len(),
|
||||
response = json_output.len(),
|
||||
"data size"
|
||||
);
|
||||
|
||||
let metrics = client.metrics();
|
||||
|
||||
let len = json_output.len();
|
||||
@@ -790,6 +803,7 @@ impl QueryData {
|
||||
async fn process(
|
||||
self,
|
||||
config: &'static ProxyConfig,
|
||||
arena: &Arena,
|
||||
cancel: CancellationToken,
|
||||
client: &mut Client<tokio_postgres::Client>,
|
||||
parsed_headers: HttpHeaders,
|
||||
@@ -798,7 +812,14 @@ impl QueryData {
|
||||
let cancel_token = inner.cancel_token();
|
||||
|
||||
let res = match select(
|
||||
pin!(query_to_json(config, &*inner, self, &mut 0, parsed_headers)),
|
||||
pin!(query_to_json(
|
||||
config,
|
||||
arena,
|
||||
&*inner,
|
||||
self,
|
||||
&mut 0,
|
||||
parsed_headers
|
||||
)),
|
||||
pin!(cancel.cancelled()),
|
||||
)
|
||||
.await
|
||||
@@ -806,10 +827,7 @@ impl QueryData {
|
||||
// The query successfully completed.
|
||||
Either::Left((Ok((status, results)), __not_yet_cancelled)) => {
|
||||
discard.check_idle(status);
|
||||
|
||||
let json_output =
|
||||
serde_json::to_string(&results).expect("json serialization should not fail");
|
||||
Ok(json_output)
|
||||
Ok(results)
|
||||
}
|
||||
// The query failed with an error
|
||||
Either::Left((Err(e), __not_yet_cancelled)) => {
|
||||
@@ -864,6 +882,7 @@ impl BatchQueryData {
|
||||
async fn process(
|
||||
self,
|
||||
config: &'static ProxyConfig,
|
||||
arena: &Arena,
|
||||
cancel: CancellationToken,
|
||||
client: &mut Client<tokio_postgres::Client>,
|
||||
parsed_headers: HttpHeaders,
|
||||
@@ -890,6 +909,7 @@ impl BatchQueryData {
|
||||
|
||||
let json_output = match query_batch(
|
||||
config,
|
||||
arena,
|
||||
cancel.child_token(),
|
||||
&transaction,
|
||||
self,
|
||||
@@ -934,16 +954,20 @@ impl BatchQueryData {
|
||||
|
||||
async fn query_batch(
|
||||
config: &'static ProxyConfig,
|
||||
arena: &Arena,
|
||||
cancel: CancellationToken,
|
||||
transaction: &Transaction<'_>,
|
||||
queries: BatchQueryData,
|
||||
parsed_headers: HttpHeaders,
|
||||
) -> Result<String, SqlOverHttpError> {
|
||||
let mut results = Vec::with_capacity(queries.queries.len());
|
||||
let mut comma = false;
|
||||
let mut results = r#"{"results":["#.to_string();
|
||||
|
||||
let mut current_size = 0;
|
||||
for stmt in queries.queries {
|
||||
let query = pin!(query_to_json(
|
||||
config,
|
||||
arena,
|
||||
transaction,
|
||||
stmt,
|
||||
&mut current_size,
|
||||
@@ -954,7 +978,11 @@ async fn query_batch(
|
||||
match res {
|
||||
// TODO: maybe we should check that the transaction bit is set here
|
||||
Either::Left((Ok((_, values)), _cancelled)) => {
|
||||
results.push(values);
|
||||
if comma {
|
||||
results.push(',');
|
||||
}
|
||||
results.push_str(&values);
|
||||
comma = true;
|
||||
}
|
||||
Either::Left((Err(e), _cancelled)) => {
|
||||
return Err(e);
|
||||
@@ -965,22 +993,28 @@ async fn query_batch(
|
||||
}
|
||||
}
|
||||
|
||||
let results = json!({ "results": results });
|
||||
let json_output = serde_json::to_string(&results).expect("json serialization should not fail");
|
||||
results.push_str("]}");
|
||||
|
||||
Ok(json_output)
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
async fn query_to_json<T: GenericClient>(
|
||||
config: &'static ProxyConfig,
|
||||
arena: &Arena,
|
||||
client: &T,
|
||||
data: QueryData,
|
||||
current_size: &mut usize,
|
||||
parsed_headers: HttpHeaders,
|
||||
) -> Result<(ReadyForQueryStatus, impl Serialize), SqlOverHttpError> {
|
||||
) -> Result<(ReadyForQueryStatus, String), SqlOverHttpError> {
|
||||
info!("executing query");
|
||||
let query_params = data.params;
|
||||
let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?);
|
||||
|
||||
let query_params = arena.params_arena[data.params.into_range()]
|
||||
.iter()
|
||||
.map(|p| p.map(|p| &arena.str_arena[p.into_range()]));
|
||||
|
||||
let query = &arena.str_arena[data.query.into_range()];
|
||||
|
||||
let mut row_stream = std::pin::pin!(client.query_raw_txt(query, query_params).await?);
|
||||
info!("finished executing query");
|
||||
|
||||
// Manually drain the stream into a vector to leave row_stream hanging
|
||||
@@ -1049,12 +1083,13 @@ async fn query_to_json<T: GenericClient>(
|
||||
|
||||
// Resulting JSON format is based on the format of node-postgres result.
|
||||
let results = json!({
|
||||
"command": command_tag_name.to_string(),
|
||||
"command": command_tag_name,
|
||||
"rowCount": command_tag_count,
|
||||
"rows": rows,
|
||||
"fields": fields,
|
||||
"rowAsArray": array_mode,
|
||||
});
|
||||
})
|
||||
.to_string();
|
||||
|
||||
Ok((ready, results))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user