mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Further work on sql over http
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
use crate::http::pg_to_json::postgres_row_to_json_value;
|
||||
use crate::{
|
||||
auth, cancellation::CancelMap, config::ProxyConfig, console, error::io_error,
|
||||
proxy::handle_ws_client,
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{
|
||||
server::{accept, conn::AddrIncoming},
|
||||
upgrade::Upgraded,
|
||||
@@ -15,9 +17,7 @@ use pq_proto::StartupMessageParams;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use percent_encoding::percent_decode;
|
||||
use tokio_postgres::types::{ToSql, FromSql};
|
||||
use std::collections::HashMap;
|
||||
use tokio_postgres::types::{ToSql};
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
future::ready,
|
||||
@@ -32,7 +32,7 @@ use tokio::{
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use url::{form_urlencoded, Url};
|
||||
use url::{Url};
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
// TODO: use `std::sync::Exclusive` once it's stabilized.
|
||||
@@ -235,19 +235,11 @@ async fn handle_sql(
|
||||
.next()
|
||||
.ok_or(anyhow::anyhow!("invalid database name"))?;
|
||||
|
||||
let username = match connection_url.username() {
|
||||
"" => return Err(anyhow::anyhow!("empty username")),
|
||||
s => Ok(s)
|
||||
}?;
|
||||
let username = connection_url.username();
|
||||
|
||||
let maybe_empty_password = connection_url
|
||||
let password = connection_url
|
||||
.password()
|
||||
.ok_or(anyhow::anyhow!("no password"))?;
|
||||
|
||||
let password = match maybe_empty_password {
|
||||
"" => return Err(anyhow::anyhow!("empty password")),
|
||||
s => Ok(s)
|
||||
}?;
|
||||
|
||||
let hostname = connection_url
|
||||
.host_str()
|
||||
@@ -260,7 +252,7 @@ async fn handle_sql(
|
||||
.and_then(|h| h.split(':').next());
|
||||
|
||||
match host_header {
|
||||
Some(h) if h == hostname => Ok(h),
|
||||
Some(h) if h == hostname => h,
|
||||
Some(_) => return Err(anyhow::anyhow!("mismatched host header and hostname")),
|
||||
None => return Err(anyhow::anyhow!("no host header"))
|
||||
};
|
||||
@@ -325,28 +317,22 @@ async fn handle_sql(
|
||||
|
||||
let query = &queryData.query;
|
||||
let params = queryData.params.iter().map(|value| match value {
|
||||
Value::Null => None as &(dyn ToSql + Sync),
|
||||
// Value::Null => &None as &(dyn ToSql + Sync),
|
||||
Value::Bool(b) => b as &(dyn ToSql + Sync),
|
||||
Value::Number(n) => &n.as_f64() as &(dyn ToSql + Sync),
|
||||
Value::String(s) => s as &(dyn ToSql + Sync),
|
||||
_ => panic!("wrong parameter type")
|
||||
}).collect::<Vec<&(dyn ToSql + Sync)>>().as_ref();
|
||||
}).collect::<Vec<&(dyn ToSql + Sync)>>();
|
||||
|
||||
let rows: Vec<HashMap<_, _>> = client
|
||||
.query(query, params)
|
||||
let rows: Result<Vec<serde_json::Value>, anyhow::Error> = client
|
||||
.query(query, params.as_ref())
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(|row| {
|
||||
let mut serialized_row: HashMap<String, String> = HashMap::new();
|
||||
for i in 0..row.len() {
|
||||
let col = row.columns().get(i).map_or("?", |c| c.name());
|
||||
let val = row.get(i).unwrap_or("?");
|
||||
serialized_row.insert(col.into(), val.into());
|
||||
}
|
||||
Some(serialized_row)
|
||||
})
|
||||
.map(postgres_row_to_json_value)
|
||||
.collect();
|
||||
|
||||
let rows = rows?;
|
||||
|
||||
Ok(serde_json::to_string(&rows)?)
|
||||
}
|
||||
|
||||
@@ -354,6 +340,7 @@ pub struct ConnectionCache {
|
||||
connections: HashMap<String, tokio_postgres::Client>,
|
||||
}
|
||||
|
||||
/*
|
||||
impl ConnectionCache {
|
||||
pub fn new() -> Arc<Mutex<Self>> {
|
||||
Arc::new(Mutex::new(Self {
|
||||
@@ -398,13 +385,13 @@ impl ConnectionCache {
|
||||
.into_iter()
|
||||
.filter_map(|el| {
|
||||
if let tokio_postgres::SimpleQueryMessage::Row(row) = el {
|
||||
let mut serilaized_row: HashMap<String, String> = HashMap::new();
|
||||
let mut serialized_row: HashMap<String, String> = HashMap::new();
|
||||
for i in 0..row.len() {
|
||||
let col = row.columns().get(i).map_or("?", |c| c.name());
|
||||
let val = row.get(i).unwrap_or("?");
|
||||
serilaized_row.insert(col.into(), val.into());
|
||||
serialized_row.insert(col.into(), val.into());
|
||||
}
|
||||
Some(serilaized_row)
|
||||
Some(serialized_row)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -414,6 +401,7 @@ impl ConnectionCache {
|
||||
Ok(serde_json::to_string(&rows)?)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
|
||||
Reference in New Issue
Block a user