From be6a259b856ea8929dc816a0a2acafca99f1383c Mon Sep 17 00:00:00 2001 From: Ruslan Talpa Date: Wed, 25 Jun 2025 14:33:45 +0300 Subject: [PATCH] subzero integration WIP5 cleanup and postprocess the response and set the correct headers/status and handle errors --- proxy/src/serverless/rest.rs | 1150 +++++++++++++++++++--------------- 1 file changed, 644 insertions(+), 506 deletions(-) diff --git a/proxy/src/serverless/rest.rs b/proxy/src/serverless/rest.rs index 9a26a84b72..4099c37e3a 100644 --- a/proxy/src/serverless/rest.rs +++ b/proxy/src/serverless/rest.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::sync::Arc; use bytes::Bytes; @@ -12,13 +11,12 @@ use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; use hyper::{HeaderMap, Request, Response, StatusCode}; use indexmap::IndexMap; -use postgres_client::error::{DbError, ErrorPosition, SqlState}; use serde_json::{value::RawValue, Value as JsonValue}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::{error, info}; use typed_json::json; use url::Url; use uuid::Uuid; @@ -42,24 +40,24 @@ use crate::serverless::backend::HttpConnError; use crate::types::{DbName, RoleName}; use subzero_core::{ - api::{ApiRequest, ApiResponse, ContentType::*, SingleVal, ListVal, Payload}, - error::Error::{self as SubzeroCoreError, SingularityError, PutMatchingPkError, PermissionDenied, JsonDeserialize, NotFound, JwtTokenInvalid, InternalError}, + api::{SingleVal, ListVal, Payload}, + error::Error::{self as SubzeroCoreError, JsonDeserialize, NotFound, JwtTokenInvalid, InternalError, GucHeadersError, GucStatusError, ContentTypeError}, schema::DbSchema, formatter::{ Param, Param::*, postgresql::{fmt_main_query, generate}, - ToParam, Snippet, SqlParam, + Snippet, SqlParam, }, - error::JsonDeserializeSnafu, dynamic_statement::{param, sql, JoinIterator}, }; use subzero_core::{ - api::{ContentType, ContentType::*, Preferences, QueryNode::*, Representation, Resolution::*,}, - error::{*}, + api::{ContentType::*, Preferences, QueryNode::*, Representation, Resolution::*, }, + error::{*, pg_error_to_status_code}, parser::postgrest::parse, - permissions::{check_safe_functions, check_privileges, insert_policy_conditions, replace_select_star}, + permissions::{check_safe_functions}, api::DEFAULT_SAFE_SELECT_FUNCTIONS, + api::ApiResponse, }; use std::collections::HashMap; use jsonpath_lib::select; @@ -71,18 +69,14 @@ use url::form_urlencoded; pub(super) static NEON_REQUEST_ID: HeaderName = HeaderName::from_static("neon-request-id"); static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string"); -static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output"); -static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode"); -static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in"); +//static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output"); +//static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode"); +//static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in"); static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level"); static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only"); -static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable"); - -static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true"); - -// FIXME: remove this header -static HACK_TRUST_ROLE_SWITCHING: HeaderName = HeaderName::from_static("neon-hack-trust-role-switching"); +//static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable"); +//static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true"); #[derive(Debug, thiserror::Error)] @@ -267,6 +261,337 @@ fn get_conn_info( Ok(ConnInfoWithAuth { conn_info, auth }) } + +// we use our own type because we get the error from the json response +#[derive(Debug, thiserror::Error)] +pub(crate) struct PostgresError { + pub code: String, + pub message: String, + pub detail: Option, + pub hint: Option +} +impl HttpCodeError for PostgresError { + fn get_http_status_code(&self) -> StatusCode { + let status = pg_error_to_status_code(&self.code, true); + StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + } +} +impl ReportableError for PostgresError { + fn get_error_kind(&self) -> ErrorKind { + ErrorKind::User + } +} +impl UserFacingError for PostgresError { + fn to_string_client(&self) -> String { + if self.code.starts_with("PT") { + "Postgres error".to_string() + } else { + self.message.clone() + } + } +} + +impl std::fmt::Display for PostgresError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + + + +#[derive(Debug, thiserror::Error)] +pub(crate) enum RestError { + #[error("{0}")] + ReadPayload(#[from] ReadPayloadError), + #[error("{0}")] + ConnectCompute(#[from] HttpConnError), + #[error("{0}")] + ConnInfo(#[from] ConnInfoError), + //#[error("response is too large (max is {0} bytes)")] + //ResponseTooLarge(usize), + //#[error("invalid isolation level")] + //InvalidIsolationLevel, + /// for queries our customers choose to run + //#[error("{0}")] + //Postgres(#[source] postgres_client::Error), + #[error("{0}")] + Postgres(#[source] PostgresError), + /// for queries we choose to run + //#[error("{0}")] + //InternalPostgres(#[source] postgres_client::Error), + #[error("{0}")] + JsonConversion(#[from] JsonConversionError), + //#[error("{0}")] + //Cancelled(SqlOverHttpCancel), + #[error("{0}")] + SubzeroCore(#[source] SubzeroCoreError), +} + +impl ReportableError for RestError { + fn get_error_kind(&self) -> ErrorKind { + match self { + RestError::ReadPayload(e) => e.get_error_kind(), + RestError::ConnectCompute(e) => e.get_error_kind(), + RestError::ConnInfo(e) => e.get_error_kind(), + //RestError::ResponseTooLarge(_) => ErrorKind::User, + //RestError::InvalidIsolationLevel => ErrorKind::User, + RestError::Postgres(_) => ErrorKind::Postgres, + // RestError::InternalPostgres(p) => { + // if p.as_db_error().is_some() { + // ErrorKind::Service + // } else { + // ErrorKind::Compute + // } + // } + RestError::JsonConversion(_) => ErrorKind::Postgres, + //RestError::Cancelled(c) => c.get_error_kind(), + RestError::SubzeroCore(_) => ErrorKind::User, + } + } +} + +impl UserFacingError for RestError { + fn to_string_client(&self) -> String { + match self { + RestError::ReadPayload(p) => p.to_string(), + RestError::ConnectCompute(c) => c.to_string_client(), + RestError::ConnInfo(c) => c.to_string_client(), + //RestError::ResponseTooLarge(_) => self.to_string(), + //RestError::InvalidIsolationLevel => self.to_string(), + RestError::Postgres(p) => p.to_string_client(), + //RestError::InternalPostgres(p) => p.to_string(), + RestError::JsonConversion(_) => "could not parse postgres response".to_string(), + //RestError::Cancelled(_) => self.to_string(), + RestError::SubzeroCore(s) => { + // TODO: this is a hack to get the message from the json body + let json = s.json_body(); + let default_message = "Unknown error".to_string(); + let message = json.get("message").map_or(default_message.clone(), |m| + match m { + JsonValue::String(s) => s.clone(), + _ => default_message, + } + ); + message + } + } + } +} + +impl HttpCodeError for RestError { + fn get_http_status_code(&self) -> StatusCode { + match self { + RestError::ReadPayload(e) => e.get_http_status_code(), + RestError::ConnectCompute(h) => match h.get_error_kind() { + ErrorKind::User => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + RestError::ConnInfo(_) => StatusCode::BAD_REQUEST, + //RestError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, + //RestError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, + RestError::Postgres(e) => e.get_http_status_code(), + //RestError::InternalPostgres(_) => StatusCode::INTERNAL_SERVER_ERROR, + RestError::JsonConversion(_) => StatusCode::INTERNAL_SERVER_ERROR, + //RestError::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, + RestError::SubzeroCore(e) => { + let status = e.status_code(); + StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + } + } + } +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum ReadPayloadError { + #[error("could not read the HTTP request body: {0}")] + Read(#[from] hyper::Error), + #[error("request is too large (max is {limit} bytes)")] + BodyTooLarge { limit: usize }, + #[error("could not parse the HTTP request body: {0}")] + Parse(#[from] serde_json::Error), +} + +impl From> for ReadPayloadError { + fn from(value: ReadBodyError) -> Self { + match value { + ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit }, + ReadBodyError::Read(e) => Self::Read(e), + } + } +} + +impl ReportableError for ReadPayloadError { + fn get_error_kind(&self) -> ErrorKind { + match self { + ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect, + ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User, + ReadPayloadError::Parse(_) => ErrorKind::User, + } + } +} + +impl HttpCodeError for ReadPayloadError { + fn get_http_status_code(&self) -> StatusCode { + match self { + ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST, + ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE, + ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST, + } + } +} + + + +pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue { + let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH]; + HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..])) + .expect("uuid hyphenated format should be all valid header characters") +} + +static HEADERS_TO_FORWARD: &[&HeaderName] = &[ + &AUTHORIZATION, +]; + +static JSON_SCHEMA: &str = r#" + { + "schemas":[ + { + "name":"test", + "objects":[ + { + "kind":"table", + "name":"items", + "columns":[ + { + "name":"id", + "data_type":"integer", + "primary_key":true + }, + { + "name":"name", + "data_type":"text" + } + ], + "foreign_keys":[], + "permissions":[] + } + ] + } + ] + } +"#; + +fn content_range_header(lower: i64, upper: i64, total: Option) -> String { + //debug!("content_range_header: lower: {}, upper: {}, total: {:?}", lower, upper, total); + let range_string = if total != Some(0) && lower <= upper { + format!("{lower}-{upper}") + } else { + "*".to_string() + }; + let total_string = match total { + Some(t) => format!("{t}"), + None => "*".to_string(), + }; + format!("{range_string}/{total_string}") +} + +fn content_range_status(lower: i64, upper: i64, total: Option) -> u16 { + //debug!("content_range_status: lower: {}, upper: {}, total: {:?}", lower, upper, total); + match (lower, upper, total) { + //(_, _, None) => 200, + (l, _, Some(t)) if l > t => 406, + (l, u, Some(t)) if (1 + u - l) < t => 206, + _ => 200, + } +} +fn fmt_env_query<'a>(env: &'a HashMap<&'a str, &'a str>) -> Snippet<'a> { + "select " + + if env.is_empty() { + sql("null") + } else { + env.iter() + .map(|(k, v)| "set_config(" + param(k as &SqlParam) + ", " + param(v as &SqlParam) + ", true)") + .join(",") + } +} + +fn current_schema(db_schemas: &Vec, method: &Method, headers: &HeaderMap) -> Result { + match (db_schemas.len() > 1, method, headers.get("accept-profile"), headers.get("content-profile")) { + (false, ..) => Ok(db_schemas.first().unwrap_or(&"_inexistent_".to_string()).clone()), + (_, &Method::DELETE, _, Some(content_profile_header)) + | (_, &Method::POST, _, Some(content_profile_header)) + | (_, &Method::PATCH, _, Some(content_profile_header)) + | (_, &Method::PUT, _, Some(content_profile_header)) => { + match content_profile_header.to_str() { + Ok(content_profile_str) => { + let content_profile = String::from(content_profile_str); + if db_schemas.contains(&content_profile) { + Ok(content_profile) + } else { + Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + Err(_) => Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + (_, _, Some(accept_profile_header), _) => { + match accept_profile_header.to_str() { + Ok(accept_profile_str) => { + let accept_profile = String::from(accept_profile_str); + if db_schemas.contains(&accept_profile) { + Ok(accept_profile) + } else { + Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + Err(_) => Err(SubzeroCoreError::UnacceptableSchema { + schemas: db_schemas.clone(), + }) + } + } + _ => Ok(db_schemas.first().unwrap_or(&"_inexistent_".to_string()).clone()), + } +} + +fn to_core_error(e: SubzeroCoreError) -> RestError { RestError::SubzeroCore(e) } + +fn to_sql_param(p: &Param) -> JsonValue { + match p { + SV(SingleVal(v, ..)) => { + JsonValue::String(v.to_string()) + } + Str(v) => { + JsonValue::String(v.to_string()) + } + StrOwned(v) => { + JsonValue::String((*v).clone()) + } + PL(Payload(v, ..)) => { + JsonValue::String(v.clone().into_owned()) + } + LV(ListVal(v, ..)) => { + if !v.is_empty() { + JsonValue::String(format!( + "{{\"{}\"}}", + v.iter() + .map(|e| e.replace('\\', "\\\\").replace('\"', "\\\"")) + .collect::>() + .join("\",\"") + )) + } else { + JsonValue::String(r#"{}"#.to_string()) + } + } + } +} + + pub(crate) async fn handle( config: &'static ProxyConfig, ctx: RequestContext, @@ -325,64 +650,51 @@ pub(crate) async fn handle( } r } - Err(e @ RestError::Cancelled(_)) => { + // Err(e @ RestError::Cancelled(_)) => { + // let error_kind = e.get_error_kind(); + // ctx.set_error_kind(error_kind); + + // let message = "Query cancelled, connection was terminated"; + + // tracing::info!( + // kind=error_kind.to_metric_label(), + // error=%e, + // msg=message, + // "forwarding error to user" + // ); + + // json_response( + // StatusCode::BAD_REQUEST, + // json!({ "message": message, "code": SqlState::PROTOCOL_VIOLATION.code() }), + // )? + // } + Err(e @ RestError::SubzeroCore(_)) => { let error_kind = e.get_error_kind(); ctx.set_error_kind(error_kind); - let message = "Query cancelled, connection was terminated"; - tracing::info!( kind=error_kind.to_metric_label(), error=%e, - msg=message, + msg="subzero core error", "forwarding error to user" ); + let subzero_err = match e { + RestError::SubzeroCore(e) => e, + _ => panic!("expected subzero core error"), + }; - json_response( - StatusCode::BAD_REQUEST, - json!({ "message": message, "code": SqlState::PROTOCOL_VIOLATION.code() }), - )? + let json_body = subzero_err.json_body(); + let status_code = StatusCode::from_u16(subzero_err.status_code()) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + + json_response(status_code, json_body)? } Err(e) => { let error_kind = e.get_error_kind(); ctx.set_error_kind(error_kind); - let mut message = e.to_string_client(); - let db_error = match &e { - RestError::ConnectCompute(HttpConnError::PostgresConnectionError(e)) - | RestError::Postgres(e) => e.as_db_error(), - _ => None, - }; - fn get<'a, T: Default>(db: Option<&'a DbError>, x: impl FnOnce(&'a DbError) -> T) -> T { - db.map(x).unwrap_or_default() - } - - if let Some(db_error) = db_error { - db_error.message().clone_into(&mut message); - } - - let position = db_error.and_then(|db| db.position()); - let (position, internal_position, internal_query) = match position { - Some(ErrorPosition::Original(position)) => (Some(position.to_string()), None, None), - Some(ErrorPosition::Internal { position, query }) => { - (None, Some(position.to_string()), Some(query.clone())) - } - None => (None, None, None), - }; - - let code = get(db_error, |db| db.code().code()); - let severity = get(db_error, |db| db.severity()); - let detail = get(db_error, |db| db.detail()); - let hint = get(db_error, |db| db.hint()); - let where_ = get(db_error, |db| db.where_()); - let table = get(db_error, |db| db.table()); - let column = get(db_error, |db| db.column()); - let schema = get(db_error, |db| db.schema()); - let datatype = get(db_error, |db| db.datatype()); - let constraint = get(db_error, |db| db.constraint()); - let file = get(db_error, |db| db.file()); - let line = get(db_error, |db| db.line().map(|l| l.to_string())); - let routine = get(db_error, |db| db.routine()); + let message = e.to_string_client(); + let status_code = e.get_http_status_code(); tracing::info!( kind=error_kind.to_metric_label(), @@ -391,26 +703,24 @@ pub(crate) async fn handle( "forwarding error to user" ); + let (code, detail, hint) = match e { + RestError::Postgres(e) => { + (if e.code.starts_with("PT") { + None + } else { + Some(e.code) + }, e.detail, e.hint) + }, + _ => (None, None, None), + }; + json_response( - e.get_http_status_code(), + status_code, json!({ "message": message, "code": code, "detail": detail, - "hint": hint, - "position": position, - "internalPosition": internal_position, - "internalQuery": internal_query, - "severity": severity, - "where": where_, - "table": table, - "column": column, - "schema": schema, - "dataType": datatype, - "constraint": constraint, - "file": file, - "line": line, - "routine": routine, + "hint": hint, }), )? } @@ -422,162 +732,6 @@ pub(crate) async fn handle( Ok(response) } -#[derive(Debug, thiserror::Error)] -pub(crate) enum RestError { - #[error("{0}")] - ReadPayload(#[from] ReadPayloadError), - #[error("{0}")] - ConnectCompute(#[from] HttpConnError), - #[error("{0}")] - ConnInfo(#[from] ConnInfoError), - #[error("response is too large (max is {0} bytes)")] - ResponseTooLarge(usize), - #[error("invalid isolation level")] - InvalidIsolationLevel, - /// for queries our customers choose to run - #[error("{0}")] - Postgres(#[source] postgres_client::Error), - /// for queries we choose to run - #[error("{0}")] - InternalPostgres(#[source] postgres_client::Error), - #[error("{0}")] - JsonConversion(#[from] JsonConversionError), - #[error("{0}")] - Cancelled(SqlOverHttpCancel), - #[error("{0}")] - SubzeroCore(#[source] SubzeroCoreError), -} - -impl ReportableError for RestError { - fn get_error_kind(&self) -> ErrorKind { - match self { - RestError::ReadPayload(e) => e.get_error_kind(), - RestError::ConnectCompute(e) => e.get_error_kind(), - RestError::ConnInfo(e) => e.get_error_kind(), - RestError::ResponseTooLarge(_) => ErrorKind::User, - RestError::InvalidIsolationLevel => ErrorKind::User, - RestError::Postgres(p) => p.get_error_kind(), - RestError::InternalPostgres(p) => { - if p.as_db_error().is_some() { - ErrorKind::Service - } else { - ErrorKind::Compute - } - } - RestError::JsonConversion(_) => ErrorKind::Postgres, - RestError::Cancelled(c) => c.get_error_kind(), - RestError::SubzeroCore(s) => ErrorKind::User, - } - } -} - -impl UserFacingError for RestError { - fn to_string_client(&self) -> String { - match self { - RestError::ReadPayload(p) => p.to_string(), - RestError::ConnectCompute(c) => c.to_string_client(), - RestError::ConnInfo(c) => c.to_string_client(), - RestError::ResponseTooLarge(_) => self.to_string(), - RestError::InvalidIsolationLevel => self.to_string(), - RestError::Postgres(p) => p.to_string(), - RestError::InternalPostgres(p) => p.to_string(), - RestError::JsonConversion(_) => "could not parse postgres response".to_string(), - RestError::Cancelled(_) => self.to_string(), - RestError::SubzeroCore(s) => { - // TODO: this is a hack to get the message from the json body - let json = s.json_body(); - let default_message = "Unknown error".to_string(); - let message = json.get("message").map_or(default_message.clone(), |m| - match m { - JsonValue::String(s) => s.clone(), - _ => default_message, - } - ); - message - } - } - } -} - -impl HttpCodeError for RestError { - fn get_http_status_code(&self) -> StatusCode { - match self { - RestError::ReadPayload(e) => e.get_http_status_code(), - RestError::ConnectCompute(h) => match h.get_error_kind() { - ErrorKind::User => StatusCode::BAD_REQUEST, - _ => StatusCode::INTERNAL_SERVER_ERROR, - }, - RestError::ConnInfo(_) => StatusCode::BAD_REQUEST, - RestError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, - RestError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, - RestError::Postgres(_) => StatusCode::BAD_REQUEST, - RestError::InternalPostgres(_) => StatusCode::INTERNAL_SERVER_ERROR, - RestError::JsonConversion(_) => StatusCode::INTERNAL_SERVER_ERROR, - RestError::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, - RestError::SubzeroCore(e) => { - let status = e.status_code(); - StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) - } - } - } -} - -#[derive(Debug, thiserror::Error)] -pub(crate) enum ReadPayloadError { - #[error("could not read the HTTP request body: {0}")] - Read(#[from] hyper::Error), - #[error("request is too large (max is {limit} bytes)")] - BodyTooLarge { limit: usize }, - #[error("could not parse the HTTP request body: {0}")] - Parse(#[from] serde_json::Error), -} - -impl From> for ReadPayloadError { - fn from(value: ReadBodyError) -> Self { - match value { - ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit }, - ReadBodyError::Read(e) => Self::Read(e), - } - } -} - -impl ReportableError for ReadPayloadError { - fn get_error_kind(&self) -> ErrorKind { - match self { - ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect, - ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User, - ReadPayloadError::Parse(_) => ErrorKind::User, - } - } -} - -impl HttpCodeError for ReadPayloadError { - fn get_http_status_code(&self) -> StatusCode { - match self { - ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST, - ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE, - ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST, - } - } -} - -#[derive(Debug, thiserror::Error)] -pub(crate) enum SqlOverHttpCancel { - #[error("query was cancelled")] - Postgres, - #[error("query was cancelled while stuck trying to connect to the database")] - Connect, -} - -impl ReportableError for SqlOverHttpCancel { - fn get_error_kind(&self) -> ErrorKind { - match self { - SqlOverHttpCancel::Postgres => ErrorKind::ClientDisconnect, - SqlOverHttpCancel::Connect => ErrorKind::ClientDisconnect, - } - } -} - async fn handle_inner( _cancel: CancellationToken, @@ -627,138 +781,11 @@ async fn handle_inner( } -pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue { - let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH]; - HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..])) - .expect("uuid hyphenated format should be all valid header characters") -} - -static HEADERS_TO_STRIP: &[&HeaderName] = &[ - //AUTHORIZATION, - &NEON_REQUEST_ID, - &CONN_STRING, - &RAW_TEXT_OUTPUT, - &ARRAY_MODE, - &TXN_ISOLATION_LEVEL, - &TXN_READ_ONLY, - &TXN_DEFERRABLE, -]; - -static JSON_SCHEMA: &str = r#" - { - "schemas":[ - { - "name":"test", - "objects":[ - { - "kind":"table", - "name":"items", - "columns":[ - { - "name":"id", - "data_type":"integer", - "primary_key":true - }, - { - "name":"name", - "data_type":"text" - } - ], - "foreign_keys":[], - "permissions":[] - } - ] - } - ] - } -"#; - -pub fn fmt_env_query<'a>(env: &'a HashMap<&'a str, &'a str>) -> Snippet<'a> { - "select " - + if env.is_empty() { - sql("null") - } else { - env.iter() - .map(|(k, v)| "set_config(" + param(k as &SqlParam) + ", " + param(v as &SqlParam) + ", true)") - .join(",") - } -} - -fn current_schema(db_schemas: &Vec, method: &Method, headers: &HeaderMap) -> Result { - match (db_schemas.len() > 1, method, headers.get("accept-profile"), headers.get("content-profile")) { - (false, ..) => Ok(db_schemas.first().unwrap_or(&"_inexistent_".to_string()).clone()), - (_, &Method::DELETE, _, Some(content_profile_header)) - | (_, &Method::POST, _, Some(content_profile_header)) - | (_, &Method::PATCH, _, Some(content_profile_header)) - | (_, &Method::PUT, _, Some(content_profile_header)) => { - match content_profile_header.to_str() { - Ok(content_profile_str) => { - let content_profile = String::from(content_profile_str); - if db_schemas.contains(&content_profile) { - Ok(content_profile) - } else { - Err(SubzeroCoreError::UnacceptableSchema { - schemas: db_schemas.clone(), - }) - } - } - Err(_) => Err(SubzeroCoreError::UnacceptableSchema { - schemas: db_schemas.clone(), - }) - } - } - (_, _, Some(accept_profile_header), _) => { - match accept_profile_header.to_str() { - Ok(accept_profile_str) => { - let accept_profile = String::from(accept_profile_str); - if db_schemas.contains(&accept_profile) { - Ok(accept_profile) - } else { - Err(SubzeroCoreError::UnacceptableSchema { - schemas: db_schemas.clone(), - }) - } - } - Err(_) => Err(SubzeroCoreError::UnacceptableSchema { - schemas: db_schemas.clone(), - }) - } - } - _ => Ok(db_schemas.first().unwrap_or(&"_inexistent_".to_string()).clone()), - } -} -pub fn to_core_error(e: SubzeroCoreError) -> RestError { - RestError::SubzeroCore(e) -} - - -fn to_sql_param(p: &Param) -> JsonValue { - match p { - SV(SingleVal(v, ..)) => { - JsonValue::String(v.to_string()) - } - Str(v) => { - JsonValue::String(v.to_string()) - } - StrOwned(v) => { - JsonValue::String((*v).clone()) - } - PL(Payload(v, ..)) => { - JsonValue::String(v.clone().into_owned()) - } - LV(ListVal(v, ..)) => { - if !v.is_empty() { - JsonValue::String(format!( - "{{\"{}\"}}", - v.iter() - .map(|e| e.replace('\\', "\\\\").replace('\"', "\\\"")) - .collect::>() - .join("\",\"") - )) - } else { - JsonValue::String(r#"{}"#.to_string()) - } - } +// Helper function to extract optional string from JSON +fn extract_string(json: &mut serde_json::Value, key: &str) -> Option { + match json[key].take() { + JsonValue::String(s) => Some(s), + _ => None, } } @@ -770,30 +797,33 @@ async fn handle_rest_inner( jwt: String, backend: Arc, ) -> Result>, RestError> { - let mut response_headers = vec![]; - // hardcoded values for now - let max_http_body_size = 10 * 1024 * 1024; // 10MB limit + + // hardcoded values for now, these should come from a config per tenant + let max_http_body_size = 10 * 1024 * 1024; // 10MB limit let db_schemas = Vec::from(["test".to_string()]); // list of schemas available for the api let mut db_schema_parsed = serde_json::from_str::(JSON_SCHEMA) // database schema shape (will come from introspection) .map_err(|e| RestError::SubzeroCore(JsonDeserialize {source:e }))?; - let disable_internal_permissions = true; // in the context of neon we emulate postgrest (so no internal permissions checks) + //let disable_internal_permissions = true; // in the context of neon we emulate postgrest (so no internal permissions checks) db_schema_parsed.use_internal_permissions = false; // TODO: change the introspection query to auto set this to false depending on params let db_schema = &db_schema_parsed; let api_prefix = "/rest/v1/"; let db_extra_search_path = "public, extensions".to_string(); let role_claim_key = ".role".to_string(); let role_claim_path = format!("${}", role_claim_key); - println!("role_claim_path: {:?}", role_claim_path); let db_anon_role = Some("anon".to_string()); //let max_rows = Some("1000".to_string()); let max_rows = None; let db_allowed_select_functions = DEFAULT_SAFE_SELECT_FUNCTIONS.iter().map(|m| *m).collect::>(); + // end hardcoded values + // validate the jwt token let jwt_parsed = backend - .authenticate_with_jwt(ctx, &conn_info.user_info, jwt.clone()) //TODO: do not clone jwt + .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) .await .map_err(HttpConnError::from)?; + + // extract the jwt claims (we'll need them later to set the role and env) let jwt_claims = match jwt_parsed.keys { ComputeCredentialKeys::JwtPayload(payload_bytes) => { // `payload_bytes` contains the raw JWT payload as Vec @@ -806,7 +836,8 @@ async fn handle_rest_inner( None } }; - println!("jwt_payload: {:?}", &jwt_claims); + + // read the role from the jwt claims (and set it to the "anon" role if not present) let (role, authenticated) = match &jwt_claims { Some(claims) => match select(claims, &role_claim_path) { Ok(v) => match &v[..] { @@ -817,25 +848,18 @@ async fn handle_rest_inner( }, None => Ok((db_anon_role.as_ref(), false)), }?; - println!("role: {:?}", role); - println!("authenticated: {:?}", authenticated); - + // do not allow unauthenticated requests when there is no anonymous role setup if let (None, false) = (role, authenticated) { return Err(RestError::SubzeroCore(JwtTokenInvalid { message: "unauthenticated requests not allowed".to_string(), })); } - // println!("jwt: {:?}", jwt.keys); - - let role = match role { - Some(r) => r, - None => "", - }; - + // start deconstructing the request because subzero core mostly works with &str let (parts, originial_body) = request.into_parts(); - let method = parts.method.to_string(); + let method = parts.method; + let method_str = method.to_string(); let path = parts.uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); // this is actually the table name (or rpc/function_name) @@ -847,36 +871,35 @@ async fn handle_rest_inner( })), }?; - let schema_name = ¤t_schema(&db_schemas, &parts.method, &parts.headers).map_err(RestError::SubzeroCore)?; + // pick the current schema from the headers (or the first one from config) + let schema_name = ¤t_schema(&db_schemas, &method, &parts.headers).map_err(RestError::SubzeroCore)?; + + // add the content-profile header to the response + let mut response_headers = vec![]; if db_schemas.len() > 1 { response_headers.push(("Content-Profile".to_string(), schema_name.clone())); } - - let body = Full::new(Bytes::new()).map_err(|never| match never {}).boxed(); - - - // print all the local variables - println!("schema_name: {:?}", schema_name); - println!("db_schemas: {:?}", db_schemas); - println!("db_schema: {:?}", db_schema); - println!("root: {:?}", root); - println!("method: {:?}", method); - println!("path: {:?}", path); - println!("response_headers: {:?}", response_headers); - println!("originial_body: {:?}", originial_body); - //println!("parts: {:?}", parts); - - println!("conn_info: {:?}", conn_info); - println!("jwt: {:?}", jwt); - + // parse the query string into a Vec<(&str, &str)> let query = match parts.uri.query() { Some(q) => form_urlencoded::parse(q.as_bytes()).collect(), None => vec![], }; let get: Vec<(&str, &str)> = query.iter().map(|(k, v)| (&**k, &**v)).collect(); - let headers_map = parts.headers; + let mut headers_map = parts.headers; + let local_proxy_uri = ::http::Uri::from_static("http://proxy.local/sql"); + let mut req = Request::builder().method(Method::POST).uri(local_proxy_uri); + + // todo(conradludgate): maybe auth-broker should parse these and re-serialize + // these instead just to ensure they remain normalised. + for &h in HEADERS_TO_FORWARD { + if let Some(hv) = headers_map.remove(h) { + req = req.header(h, hv); + } + } + + // convert the headers map to a HashMap<&str, &str> let headers: HashMap<&str, &str> = headers_map.iter() .map(|(k, v)| (k.as_str(), v.to_str().unwrap_or("__BAD_HEADER__"))) .collect(); @@ -884,17 +907,15 @@ async fn handle_rest_inner( let cookies = HashMap::new(); // TODO: add cookies // Read the request body - let body_bytes = read_body_with_limit(originial_body, max_http_body_size).await.map_err(ReadPayloadError::from)?; // 10MB limit + let body_bytes = read_body_with_limit(originial_body, max_http_body_size).await.map_err(ReadPayloadError::from)?; let body_as_string: Option = if body_bytes.is_empty() { None } else { Some(String::from_utf8_lossy(&body_bytes).into_owned()) }; - println!("ready to parse!!!!!!!"); - let mut api_request = parse(schema_name, root, db_schema, method.as_str(), path, get, body_as_string.as_deref(), headers, cookies, max_rows).map_err(RestError::SubzeroCore)?; - - + // parse the request into an ApiRequest struct + let api_request = parse(schema_name, root, db_schema, method_str.as_str(), path, get, body_as_string.as_deref(), headers, cookies, max_rows).map_err(RestError::SubzeroCore)?; // in case when the role is not set (but authenticated through jwt) the query will be executed with the privileges // of the "authenticator" role unless the DbSchema has internal privileges set @@ -902,43 +923,48 @@ async fn handle_rest_inner( // replace "*" with the list of columns the user has access to // so that he does not encounter permission errors // replace_select_star(db_schema, schema_name, role, &mut api_request.query).map_err(to_core_error)?; - println!("after replace_select_star !!!!!!!"); - - if !disable_internal_permissions { - // check_privileges(db_schema, schema_name, role, &api_request).map_err(to_core_error)?; - println!("after check_privileges !!!!!!!"); - } - println!("after check_privileges 2 !!!!!!!"); + + let role_str = match role { + Some(r) => r, + None => "", + }; + // this is not relevant when acting as PostgREST + // if !disable_internal_permissions { + // check_privileges(db_schema, schema_name, role_str, &api_request).map_err(to_core_error)?; + // } + check_safe_functions(&api_request, &db_allowed_select_functions).map_err(to_core_error)?; - println!("after check_safe_functions !!!!!!!"); - if !disable_internal_permissions { - insert_policy_conditions(db_schema, schema_name, role, &mut api_request.query).map_err(to_core_error)?; - println!("after insert_policy_conditions !!!!!!!"); - } - - println!("api_request after checks: {:?}", api_request); + + // this is not relevant when acting as PostgREST + // if !disable_internal_permissions { + // insert_policy_conditions(db_schema, schema_name, role_str, &mut api_request.query).map_err(to_core_error)?; + // } // when using internal privileges not switch "current_role" // TODO: why do we need this? - let env_role = if !disable_internal_permissions && db_schema.use_internal_permissions { - None - } else { - Some(role) - }; + // let env_role = if !disable_internal_permissions && db_schema.use_internal_permissions { + // None + // } else { + // Some(role_str) + // }; + let env_role = Some(role_str); + // construct the env (passed in to the sql context as GUCs) let empty_json = "{}".to_string(); let headers_env = serde_json::to_string(&api_request.headers).unwrap_or(empty_json.clone()); let cookies_env = serde_json::to_string(&api_request.cookies).unwrap_or(empty_json.clone()); let get_env = serde_json::to_string(&api_request.get).unwrap_or(empty_json.clone()); - let jwt_claims_env = jwt_claims.as_ref().map(|v| serde_json::to_string(v).unwrap_or(empty_json.clone())) - .unwrap_or( - if let Some(r) = env_role { - let claims: HashMap<&str, &str> = HashMap::from([("role", r)]); - serde_json::to_string(&claims).unwrap_or(empty_json.clone()) - } else { - empty_json.clone() - } - ); + let jwt_claims_env = jwt_claims + .as_ref() + .map(|v| serde_json::to_string(v).unwrap_or(empty_json.clone())) + .unwrap_or( + if let Some(r) = env_role { + let claims: HashMap<&str, &str> = HashMap::from([("role", r)]); + serde_json::to_string(&claims).unwrap_or(empty_json.clone()) + } else { + empty_json.clone() + } + ); let mut env: HashMap<&str, &str> = HashMap::from([ ("request.method", api_request.method), ("request.path", api_request.path), @@ -951,43 +977,22 @@ async fn handle_rest_inner( if let Some(r) = env_role { env.insert("role", r.into()); } + + // generate the sql statements let (env_statement, env_parameters, _) = generate(fmt_env_query(&env)); let (main_statement, main_parameters, _) = generate(fmt_main_query(db_schema, api_request.schema_name, &api_request, &env).map_err(to_core_error)?); - - println!("env_statement: {:?} \n env_parameters: {:?}", env_statement, env_parameters); - println!("main_statement: {:?} \n main_parameters: {:?}", main_statement, main_parameters); - - // now we are ready to send the request to the local proxy + // now we are ready to build the request to the local proxy let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?; - - let local_proxy_uri = ::http::Uri::from_static("http://proxy.local/sql"); - - let mut req = Request::builder().method(Method::POST).uri(local_proxy_uri); - - // todo(conradludgate): maybe auth-broker should parse these and re-serialize - // these instead just to ensure they remain normalised. - // for &h in HEADERS_TO_FORWARD { - // if let Some(hv) = parts.headers.remove(h) { - // req = req.header(h, hv); - // } - // } - // forward all headers except the ones in HEADERS_TO_STRIP - for (h, v) in headers_map.iter() { - if !HEADERS_TO_STRIP.contains(&h) { - req = req.header(h, v); - } - } req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id())); req = req.header(&CONN_STRING, HeaderValue::from_str(connection_string).unwrap()); + req = req.header(&TXN_ISOLATION_LEVEL, HeaderValue::from_str("ReadCommitted").unwrap()); + if api_request.read_only { + req = req.header(&TXN_READ_ONLY, HeaderValue::from_str("true").unwrap()); + } - // FIXME: remove this header - req = req.header(&HACK_TRUST_ROLE_SWITCHING, HeaderValue::from_static("true")); - - - - + // convert the parameters from subzero core representation to a Vec let env_parameters_json = env_parameters.iter().map(|p| to_sql_param(&p.to_param())).collect::>(); let main_parameters_json = main_parameters.iter().map(|p| to_sql_param(&p.to_param())).collect::>(); let body: String = json!({ @@ -1007,14 +1012,16 @@ async fn handle_rest_inner( .map_err(|never| match never {}) // Convert Infallible to hyper::Error .boxed(); - let req = req .body(body_boxed) - .expect("all headers and params received via hyper should be valid for request"); + .map_err(|_| RestError::SubzeroCore(InternalError { + message: "Failed to build request".to_string() + }))?; // todo: map body to count egress - let _metrics = client.metrics(ctx); + let _metrics = client.metrics(ctx); // FIXME: is everything in the context set correctly? + // send the request to the local proxy let response = client .inner .inner @@ -1023,6 +1030,8 @@ async fn handle_rest_inner( .map_err(LocalProxyConnError::from) .map_err(HttpConnError::from)?; + let response_status = response.status(); + // Capture the response body let response_body = response .collect() @@ -1031,11 +1040,24 @@ async fn handle_rest_inner( .to_bytes(); // Parse the JSON response and extract the body content efficiently - //let body_string = { let mut response_json: serde_json::Value = serde_json::from_slice(&response_body) .map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?; - println!("Response from local proxy: {:?}", response_json); + + + // if the response status is greater than 399, then it is an error + // FIXME: check if there are other error codes or shapes of the response + if response_status.as_u16() > 399 { + // turn this postgres error from the json into PostgresError + let postgres_error = PostgresError { + message: extract_string(&mut response_json, "message").unwrap_or_default(), + code: extract_string(&mut response_json, "code").unwrap_or_default(), + detail: extract_string(&mut response_json, "detail"), + hint: extract_string(&mut response_json, "hint"), + }; + + return Err(RestError::Postgres(postgres_error)); + } // Extract the second query result (main query) let results = response_json["results"].as_array_mut() @@ -1062,53 +1084,169 @@ async fn handle_rest_inner( } // Extract columns from the first (and only) row - let row = &mut rows[0]; - + let mut row = &mut rows[0]; + let body_string = extract_string(&mut row, "body").unwrap_or_default(); + let page_total = extract_string(&mut row, "page_total"); + let total_result_set = extract_string(&mut row, "total_result_set"); + // constraints_satisfied is relevant only when using internal permissions + // let constraints_satisfied = extract_string(&mut row, "constraints_satisfied"); + let response_headers_json = extract_string(&mut row, "response_headers"); + let response_status = extract_string(&mut row, "response_status"); - // Extract the owned String directly from the JsonValue (without copying) - let body_string = match row["body"].take() { - JsonValue::String(s) => s, - _ => { - return Err(RestError::SubzeroCore(InternalError { - message: "Missing 'body' field".to_string() - })); - } + // build the intermediate response object + let api_response = ApiResponse { + page_total: page_total.map(|v| v.parse::().unwrap_or(0)).unwrap_or(0), + total_result_set: total_result_set.map(|v| v.parse::().unwrap_or(0)), + top_level_offset: 0, // FIXME: check why this is 0 + response_headers: response_headers_json, + response_status, + body: body_string, }; - let page_total = row["page_total"].as_str(); - let total_result_set = row["total_result_set"].as_str(); - let constraints_satisfied = row["constraints_satisfied"].as_bool() - .unwrap_or(true); - let response_headers = row["response_headers"].as_str(); - let response_status = row["response_status"].as_str(); - println!("Extracted columns:"); - println!(" page_total: {:?}", page_total); - println!(" total_result_set: {:?}", total_result_set); - println!(" constraints_satisfied: {:?}", constraints_satisfied); - println!(" response_headers: {:?}", response_headers); - println!(" response_status: {:?}", response_status); - println!(" body: {:?}", &body_string); - - //body_string - //}; + // rollback the transaction if the page_total is not 1 and the accept_content_type is SingularJSON + // we can not do this in the context of proxy for now + // if api_request.accept_content_type == SingularJSON && api_response.page_total != 1 { + // transaction.rollback().await.context(PgDbSnafu { authenticated })?; + // return Err(to_core_error(SingularityError { + // count: api_response.page_total, + // content_type: "application/vnd.pgrst.object+json".to_string(), + // })); + // } - // For now, return the body content as the response - Bytes::from(String) consumes without copying - let response_body = Full::new(Bytes::from(body_string)) + // rollback the transaction if the page_total is not 1 and the method is PUT + // we can not do this in the context of proxy for now + // if api_request.method == Method::PUT && api_response.page_total != 1 { + // // Makes sure the querystring pk matches the payload pk + // // e.g. PUT /items?id=eq.1 { "id" : 1, .. } is accepted, + // // PUT /items?id=eq.14 { "id" : 2, .. } is rejected. + // // If this condition is not satisfied then nothing is inserted, + // transaction.rollback().await.context(PgDbSnafu { authenticated })?; + // return Err(to_core_error(PutMatchingPkError)); + // } + + // create and return the response to the client + // this section mostly deals with setting the right headers according to PostgREST specs + let page_total = api_response.page_total; + let total_result_set = api_response.total_result_set; + let top_level_offset = api_response.top_level_offset; + let response_content_type = match (&api_request.accept_content_type, &api_request.query.node) { + (SingularJSON, _) + | ( + _, + FunctionCall { + returns_single: true, + is_scalar: false, + .. + }, + ) => SingularJSON, + (TextCSV, _) => TextCSV, + _ => ApplicationJSON, + }; + + // check if the SQL env set some response headers (happens when we called a rpc function) + if let Some(response_headers_str) = api_response.response_headers { + match serde_json::from_str(response_headers_str.as_str()) { + Ok(JsonValue::Array(headers_json)) => { + for h in headers_json { + match h { + JsonValue::Object(o) => { + for (k, v) in o.into_iter() { + match v { + JsonValue::String(s) => { + response_headers.push((k, s)); + Ok(()) + } + _ => Err(RestError::SubzeroCore(GucHeadersError)), + }? + } + Ok(()) + } + _ => Err(RestError::SubzeroCore(GucHeadersError)), + }? + } + Ok(()) + } + _ => Err(RestError::SubzeroCore(GucHeadersError)), + }? + } + + // calculate and set the content range header + let lower = top_level_offset as i64; + let upper = top_level_offset as i64 + page_total as i64 - 1; + let total = total_result_set.map(|t| t as i64); + let content_range = match (&method, &api_request.query.node) { + (&Method::POST, Insert { .. }) => content_range_header(1, 0, total), + (&Method::DELETE, Delete { .. }) => content_range_header(1, upper, total), + _ => content_range_header(lower, upper, total), + }; + response_headers.push(("Content-Range".to_string(), content_range)); + + // calculate the status code + #[rustfmt::skip] + let mut status = match (&method, &api_request.query.node, page_total, &api_request.preferences) { + (&Method::POST, Insert { .. }, ..) => 201, + (&Method::DELETE, Delete { .. }, _, Some(Preferences {representation: Some(Representation::Full),..}),) => 200, + (&Method::DELETE, Delete { .. }, ..) => 204, + (&Method::PATCH, Update { columns, .. }, 0, _) if !columns.is_empty() => 404, + (&Method::PATCH, Update { .. }, _,Some(Preferences {representation: Some(Representation::Full),..}),) => 200, + (&Method::PATCH, Update { .. }, ..) => 204, + (&Method::PUT, Insert { .. },_,Some(Preferences {representation: Some(Representation::Full),..}),) => 200, + (&Method::PUT, Insert { .. }, ..) => 204, + _ => content_range_status(lower, upper, total), + }; + + // add the preference-applied header + if let Some(Preferences { resolution: Some(r), .. }) = api_request.preferences { + response_headers.push(( + "Preference-Applied".to_string(), + match r { + MergeDuplicates => "resolution=merge-duplicates".to_string(), + IgnoreDuplicates => "resolution=ignore-duplicates".to_string(), + }, + )); + } + + // check if the SQL env set some response status (happens when we called a rpc function) + let response_status: Option = api_response.response_status; + if let Some(response_status_str) = response_status { + status = response_status_str.parse::().map_err(|_| RestError::SubzeroCore(GucStatusError))?; + } + + // set the content type header + let http_content_type = match response_content_type { + SingularJSON => Ok("application/vnd.pgrst.object+json"), + TextCSV => Ok("text/csv"), + ApplicationJSON => Ok("application/json"), + Other(t) => Err(RestError::SubzeroCore(ContentTypeError { + message: format!("None of these Content-Types are available: {t}"), + })), + }?; + + // build the response body + let response_body = Full::new(Bytes::from(api_response.body)) .map_err(|never| match never {}) .boxed(); - - Ok(Response::builder() - .status(StatusCode::OK) - .header("content-type", "application/json") - .body(response_body) - .unwrap()) + + // build the response + let mut response = Response::builder() + .status(StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)) + .header("content-type", http_content_type); + + // Add all headers from response_headers vector + for (header_name, header_value) in response_headers { + response = response.header(header_name, header_value); + } + + Ok(response.body(response_body).map_err(|_| RestError::SubzeroCore(InternalError { + message: "Failed to build response".to_string() + }))?) } #[cfg(test)] mod tests { - use super::*; + //use super::*; #[test] fn test_payload() {