diff --git a/proxy/src/http.rs b/proxy/src/http.rs index 4a2f7c1152..a544157800 100644 --- a/proxy/src/http.rs +++ b/proxy/src/http.rs @@ -2,7 +2,6 @@ //! Other modules should use stuff from this module instead of //! directly relying on deps like `reqwest` (think loose coupling). -pub mod pg_to_json; pub mod server; pub mod websocket; diff --git a/proxy/src/http/pg_to_json.rs b/proxy/src/http/pg_to_json.rs deleted file mode 100644 index 01a106f7b9..0000000000 --- a/proxy/src/http/pg_to_json.rs +++ /dev/null @@ -1,140 +0,0 @@ -// use std::iter::Map; - -// use std::Map; -use anyhow::{anyhow, Context}; -use serde_json::Map; -use tokio_postgres::{ - types::{FromSql, Type}, - Column, Row, -}; - -// some type-aliases I use in my project -pub type JSONValue = serde_json::Value; -pub type RowData = Map; -pub type Error = anyhow::Error; // from: https://github.com/dtolnay/anyhow - -pub fn postgres_row_to_json_value(row: Row) -> Result { - let row_data = postgres_row_to_row_data(row)?; - Ok(JSONValue::Object(row_data)) -} - -pub fn postgres_row_to_row_data(row: Row) -> Result { - let mut result: Map = Map::new(); - for (i, column) in row.columns().iter().enumerate() { - let name = column.name(); - let json_value = pg_cell_to_json_value(&row, column, i)?; - result.insert(name.to_string(), json_value); - } - Ok(result) -} - -pub fn pg_cell_to_json_value( - row: &Row, - column: &Column, - column_i: usize, -) -> Result { - let f64_to_json_number = |raw_val: f64| -> Result { - let temp = serde_json::Number::from_f64(raw_val).ok_or(anyhow!("invalid json-float"))?; - Ok(JSONValue::Number(temp)) - }; - Ok(match *column.type_() { - // for rust-postgres <> postgres type-mappings: https://docs.rs/postgres/latest/postgres/types/trait.FromSql.html#types - // for postgres types: https://www.postgresql.org/docs/7.4/datatype.html#DATATYPE-TABLE - - // single types - Type::BOOL => get_basic(row, column, column_i, |a: bool| Ok(JSONValue::Bool(a)))?, - Type::INT2 => get_basic(row, column, column_i, |a: i16| { - Ok(JSONValue::Number(serde_json::Number::from(a))) - })?, - Type::INT4 => get_basic(row, column, column_i, |a: i32| { - Ok(JSONValue::Number(serde_json::Number::from(a))) - })?, - Type::INT8 => get_basic(row, column, column_i, |a: i64| { - Ok(JSONValue::Number(serde_json::Number::from(a))) - })?, - Type::TEXT | Type::VARCHAR => { - get_basic(row, column, column_i, |a: String| Ok(JSONValue::String(a)))? - } - // Type::JSON | Type::JSONB => get_basic(row, column, column_i, |a: JSONValue| Ok(a))?, - Type::FLOAT4 => get_basic(row, column, column_i, |a: f32| f64_to_json_number(a.into()))?, - Type::FLOAT8 => get_basic(row, column, column_i, f64_to_json_number)?, - // these types require a custom StringCollector struct as an intermediary (see struct at bottom) - Type::TS_VECTOR => get_basic(row, column, column_i, |a: StringCollector| { - Ok(JSONValue::String(a.0)) - })?, - - // array types - Type::BOOL_ARRAY => get_array(row, column, column_i, |a: bool| Ok(JSONValue::Bool(a)))?, - Type::INT2_ARRAY => get_array(row, column, column_i, |a: i16| { - Ok(JSONValue::Number(serde_json::Number::from(a))) - })?, - Type::INT4_ARRAY => get_array(row, column, column_i, |a: i32| { - Ok(JSONValue::Number(serde_json::Number::from(a))) - })?, - Type::INT8_ARRAY => get_array(row, column, column_i, |a: i64| { - Ok(JSONValue::Number(serde_json::Number::from(a))) - })?, - Type::TEXT_ARRAY | Type::VARCHAR_ARRAY => { - get_array(row, column, column_i, |a: String| Ok(JSONValue::String(a)))? - } - // Type::JSON_ARRAY | Type::JSONB_ARRAY => get_array(row, column, column_i, |a: JSONValue| Ok(a))?, - Type::FLOAT4_ARRAY => get_array(row, column, column_i, f64_to_json_number)?, - Type::FLOAT8_ARRAY => get_array(row, column, column_i, f64_to_json_number)?, - // these types require a custom StringCollector struct as an intermediary (see struct at bottom) - Type::TS_VECTOR_ARRAY => get_array(row, column, column_i, |a: StringCollector| { - Ok(JSONValue::String(a.0)) - })?, - - _ => anyhow::bail!( - "Cannot convert pg-cell \"{}\" of type \"{}\" to a JSONValue.", - column.name(), - column.type_().name() - ), - }) -} - -fn get_basic<'a, T: FromSql<'a>>( - row: &'a Row, - column: &Column, - column_i: usize, - val_to_json_val: impl Fn(T) -> Result, -) -> Result { - let raw_val = row - .try_get::<_, Option>(column_i) - .with_context(|| format!("column_name:{}", column.name()))?; - raw_val.map_or(Ok(JSONValue::Null), val_to_json_val) -} -fn get_array<'a, T: FromSql<'a>>( - row: &'a Row, - column: &Column, - column_i: usize, - val_to_json_val: impl Fn(T) -> Result, -) -> Result { - let raw_val_array = row - .try_get::<_, Option>>(column_i) - .with_context(|| format!("column_name:{}", column.name()))?; - Ok(match raw_val_array { - Some(val_array) => { - let mut result = vec![]; - for val in val_array { - result.push(val_to_json_val(val)?); - } - JSONValue::Array(result) - } - None => JSONValue::Null, - }) -} - -struct StringCollector(String); -impl FromSql<'_> for StringCollector { - fn from_sql( - _: &Type, - raw: &[u8], - ) -> Result> { - let result = std::str::from_utf8(raw)?; - Ok(StringCollector(result.to_owned())) - } - fn accepts(_ty: &Type) -> bool { - true - } -} diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index 2b4e088271..accb175352 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -1,11 +1,9 @@ -use crate::http::pg_to_json::postgres_row_to_json_value; use crate::{ auth, cancellation::CancelMap, config::ProxyConfig, console, error::io_error, proxy::handle_ws_client, }; use bytes::{Buf, Bytes}; use futures::{Sink, Stream, StreamExt}; -use hashbrown::HashMap; use hyper::{ server::{accept, conn::AddrIncoming}, upgrade::Upgraded, @@ -16,6 +14,7 @@ use pin_project_lite::pin_project; use pq_proto::StartupMessageParams; use percent_encoding::percent_decode; +use std::collections::HashMap; use std::{ convert::Infallible, future::ready, @@ -287,19 +286,26 @@ async fn handle_sql( }); let sql = percent_decode(sql.as_bytes()).decode_utf8()?.to_string(); - info!("!!!! query: '{}'", sql); - let rows: Result, anyhow::Error> = client - .query(&sql, &[]) + let rows: Vec> = client + .simple_query(&sql) .await? .into_iter() - .map(postgres_row_to_json_value) + .filter_map(|el| { + if let tokio_postgres::SimpleQueryMessage::Row(row) = el { + let mut serilaized_row: HashMap = HashMap::new(); + for i in 0..row.len() { + let col = row.columns().get(i).map_or("?", |c| c.name()); + let val = row.get(i).unwrap_or("?"); + serilaized_row.insert(col.into(), val.into()); + } + Some(serilaized_row) + } else { + None + } + }) .collect(); - let rows = rows?; - - info!("!!!! n_rows: {}", rows.len()); - Ok(serde_json::to_string(&rows)?) }